概述
Mapreduce的编程流程主要分八个阶段:两个MAP阶段,四个shuffle阶段,两个reduce阶段。
Map两个阶段:
1:设置inputformat类,将数据分为key-value对(k1 v1),并将其输入到第二步。
2:自定义Map逻辑将第一步的结果转换为另外的键值对(k2,v2),并输出。
shuffle四个阶段:
3:对输出的键值对进行分区。
4:对不同分区的数据按照相同的key排序。
5:(可选),对分组后的数据进行初步规约,降低数据的网络拷贝。
6:对数据进行分组,将相同key的value放入同一个集合中。
Reduce两个阶段:
7:对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的键对值进行处理,转换为新的键对值(k3 v3),输出。
8:设置outputformat类处理并保存Reduce输出的键值对。
图解:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pbJTboOf-1603113436059)(https://s1.ax1x.com/2020/09/29/0eqy8g.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K0BpE1tg-1603113436066)(https://s1.ax1x.com/2020/09/29/0eq4aV.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UlWoI5sv-1603113436069)(https://s1.ax1x.com/2020/09/29/0eLkqI.png)]
Mapreduce wordcount案例编程流程
链接:https://blog.csdn.net/qq_16146103/article/details/105692080
重写Mapper类:
package org.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
重写Reducer类:
package org.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
main函数主驱动类:
package org.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
public class WcDriver {
@Test
public void main() throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
job.setJarByClass(WcDriver.class);
// 3 设置map和reduce类
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:/IDEA java项目/data/a.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:/IDEA java项目/data/output"));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Mappreduce两种运行模式
集群运行模式:
1:将mapreduce程序提交给Yarn集群,分发到许多节点并发执行。
2:处理和输出的数据应位于hdfs文件系统中。
3:将文件打为jar包并上传,在集群中使用Hadoop命令执行。
应使用IDEA将map程序打包成jar包。
本地运行模式:
map程序用在本地用单进程执行,且处理和输出的数据在本地文件系统中。
最后
以上就是犹豫缘分为你收集整理的mapreduce编程流程的全部内容,希望文章能够帮你解决mapreduce编程流程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复