我是靠谱客的博主 等待鸭子,最近开发中收集的这篇文章主要介绍MapReduce小作业,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

题目:读取文件类型如下

sb4tF0D0 yH12ZA30gq 296.289 
oHuCS oHuCS 333.962 
oHuCS yH12ZA30gq 14.056 
sb4tF0D0 oHuCS 522.122 
oHuCS yH12ZA30gq 409.904 
sb4tF0D0 yH12ZA30gq 590.815 
sb4tF0D0 oHuCS 239.093 
sb4tF0D0 yH12ZA30gq 284.091 
sb4tF0D0 yH12ZA30gq 28.463 
sb4tF0D0 sb4tF0D0 38.819 
每行有空格隔开的三个字符串:<a> <b> <length>;最后一个字符串可以转化成float型。表示a通话给b,用了length的时长。

输出文件,统计<a b>对( 与<b a>对不同)的通话次数和,平均通话时间,类型如:<a> <b> <times> <avg_length>。输出文件格式如下:

oHuCS oHuCS	1 333.962
oHuCS yH12ZA30gq	2 211.980
sb4tF0D0 oHuCS	2 380.607
sb4tF0D0 sb4tF0D0	1 38.819
sb4tF0D0 yH12ZA30gq	4 299.915
解决办法:

————————————————  以下是程序代码和注释  ——————————————————

引入相关包

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
//程序使用的hadoop数据类型
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.ArrayWritable;
在Master和Worker之间传递DoubleWritable数组需要修改数据类型;构造新类DoubleArrayWritable 为后面的job做铺垫。

public static class DoubleArrayWritable extends ArrayWritable { 
	public DoubleArrayWritable() { 
		super(DoubleWritable.class); 
	}
	public DoubleArrayWritable(DoubleWritable[] dw) {
		 super(DoubleWritable.class);
		 set(dw);
	}
}

MapReduce程序中必须实现的Mapper类:

注:Mapper< in_KeyType,in_ValueType,Out_keyType,Out_ValueType >

public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleArrayWritable> { //使用新建的DoubleArrayWritable类型作为输入输出类型
    private final static DoubleWritable one = new DoubleWritable(1.0); 
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] in_value = value.toString().split(" ");
        if(in_value.length != 3) return;
        try{
            Text out_key = new Text();
            out_key.set(in_value[0]+"$"+in_value[1]); //key值是字符串对 我的处理时将字符串对合并成一个字符串中间用$隔开
            DoubleWritable[] tmp = new DoubleWritable[2];
            DoubleArrayWritable out_value = new DoubleArrayWritable(); 
            tmp[0] = one;
            tmp[1] = new DoubleWritable(Double.valueOf(in_value[2]));
            out_value.set(tmp); //这里注意新建DoubleArrayWritable的赋值方案
            System.out.println(out_key.getBytes().toString());
            context.write(out_key, out_value);
        }catch(Exception e){
        	System.out.println("map error line");
        }
    }
}

MapReduce程序中可以实现Combiner的Reduce类:combiner是应用于每个本地map的输出,可以改善在shuffle阶段的数据吞吐量。

注:Reducer< in_KeyType,in_ValueType,Out_keyType,Out_ValueType >

public static class IntSumCombiner extends Reducer<Text, DoubleArrayWritable, Text, DoubleArrayWritable> {
    private DoubleArrayWritable out_value = new DoubleArrayWritable();
    public void reduce(Text key, Iterable<DoubleArrayWritable> values, Context context) //使用新建的DoubleArrayWritable类型作为输入输出类型
    	throws IOException, InterruptedException {
    	double times_sum = 0.0,sum = 0.0;
        for (DoubleArrayWritable val : values) {
            times_sum += Double.valueOf(val.get()[0].toString());//这里注意DoubleArrayWritable的读取方案
            sum       += Double.valueOf(val.get()[1].toString());
        }
        DoubleWritable[] tmp = new DoubleWritable[2];
        tmp[0] = new DoubleWritable(times_sum);
        tmp[1] = new DoubleWritable(sum);
        out_value.set(tmp);
        context.write(key, out_value);
    }
}

MapReduce程序中必须实现Reducer的Reduce类

注:Reducer< in_KeyType,in_ValueType,Out_keyType,Out_ValueType >

public static class IntSumReducer extends Reducer<Text, DoubleArrayWritable, Text, Text> {
    private Text result_key = new Text();
    private Text result_value = new Text();
    protected void setup(Context context) {
    	
    }
    public void reduce(Text key, Iterable<DoubleArrayWritable> values, Context context) 
    	throws IOException, InterruptedException {
    	double times_sum = 0.0,sum = 0.0;
        for (DoubleArrayWritable val : values) {
        	times_sum += Double.valueOf(val.get()[0].toString());
        	sum       += Double.valueOf(val.get()[1].toString());
        }
        sum /= times_sum;
        DecimalFormat decimalFormat1 = new DecimalFormat("");
        DecimalFormat decimalFormat2 = new DecimalFormat(".000"); //float型数据后保留3位小数
        String times_sum_string = decimalFormat1.format(times_sum);
        String sum_string = decimalFormat2.format(sum);
        // generate result key
        String out_key = new String(key.getBytes(),0,key.getLength(),"GBK").replace("$"," ");
        result_key.set(out_key);
        // generate result value
        result_value.set( times_sum_string+" "+sum_string );
        context.write(result_key, result_value);
    }
}
自己写的初始化函数,主要是初始化Job参数

public static Job initJob(Configuration conf) throws IOException{
    Job job = Job.getInstance(conf, "Hw2Part1");
    job.setJarByClass(Hw2Part1.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumCombiner.class);
    job.setReducerClass(IntSumReducer.class);
    
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(DoubleArrayWritable.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    return job;
}

主函数

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> [<in>...] <out>");
        System.exit(2);
    }
    Job job = initJob(conf);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}





















 

最后

以上就是等待鸭子为你收集整理的MapReduce小作业的全部内容,希望文章能够帮你解决MapReduce小作业所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部