概述
什么是MapReduce
MapReduce是一个分布式计算框架
它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务
起源于Google
适用于大规模数据处理场景
每个job包含Map和Reduce两部分
MapReduce设计思想
分而治之
简化并行计算的编程模型
构建抽象模型:Map和Reduce
开发人员专注与实现Mapper和Reducer函数
隐藏系统层细节
MapReduce特点
优点
易于编程、可拓展性、高容错性、高吞吐量
不适用领域
难以实时计算、不适合流式计算
使用MapReduce实现WordCount
流程图如下
MapReduce执行过程
数据定义格式
map:(K1,V1) -> list(K2,V2)
reduce:(K2,list(V2)) -> list(K3,V3)
MapReduce执行过程
Mapper
Combiner
Partitioner
Shuffle and Sort
Reducer
Hadoop V1 MR引擎
Job Tracker
运行在Namenode
接受客户端Job请求
提交给Task Tracker
Task Tracker
从Job Tracker接受任务请求
执行map、reduce等操作
返回心跳给Job Tracker
Hadoop V2 YARN
YARN的变化
支持更多的计算引擎,兼容MapReduce
更好的资源管理,减少Job Tracker的资源消耗
将Job Tracker的资源管理分为ResourceManager
将Job Tracker的作业调度分为ApplicationMaster
NodeManager称为每个节点的资源和管理任务
Hadoop及YARN架构
Hadoop2 MR在YARN上运行流程
InputSplit(输入分片)
在map之前,根据输入文件创建inputSplit
每个InputSplit对应一个Mapper任务
输入分片存储的是分片长度和记录数据位置的数组
block和split的区别
block是数据的物理表示
split是块中数据的逻辑表示
split划分是在记录的边界处
split的数量应不大于block的数量(绝大多数情况下相等)
Shuffle阶段
数据从Map输出到Reduce输入的过程
Key&Value类型
必须可序列化(serializable)
作用:网络传输以及持久化存储
IntWritable、LongWriteable、FloatWriteable、Text、
DoubleWriteable、BooleanWriteable、NullWriteable等
都继承了Writeable接口
并实现write()和readFiled()方法
Keys必须实现WriteableComparable接口
Reduce阶段需要sort
keys需要可比较
MapReduce编程模型
InputFormat接口
定义了如何将数据读入Mapper
InputSplit[] getSplits
InputSplit表示由单个Mapper处理的数据
getSplits方法将一个大数据在逻辑上拆分为InputSplit
RecordReader<K,V>getRecordReader
常用InputFormat接口实现类
TextInputFormat
FileInputFormat
KeyValueInputFormat
Mapper类
Mapper主要方法
void setup(Context context)
org.apache.hadoop.mapreduce.Mapper.Context
void map(KEY key,VALUE value,Context context)
为输入分片中的每个键/值对调用一次
void cleanup(Context context)
void run(Context context)
可通过重写该方法对Mapper进行更完整控制
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context ctx)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer( value.toString() );
while ( itr.hasMoreTokens() )
{
word.set( itr.nextToken() );
ctx.write( word, one);
}
}
}
Combiner类
Combiner相当于本地化的Reduce操作
在shuffle之前进行本地聚合
用于性能优化,可选项
输入和输出类型一致
Reducer可以被用作Combiner的条件
复合交换律和结合律
实现Combiner
job.setCombinerClass(WCReducer.class)
Partitioner类
用于在Map端对key进行分区
默认使用的是HashPartitioner
获取key的哈希值
使用key的哈希值对Reducer处理
自定义Partitioner
继承抽象类Partitioner,重写getPartition方法
job.setPartitionerClass(MyPartitioner.class)
Reducer类
Reducer主要方法
void setup(Context context)
org.apache.hadoop.mapreduce.Reducer.Contect
void reduce(KEY key,Iterable<VALUE> values,Context context)
为每个key调用一次
void cleanup(Context context)
void run(Context context)
可通过重写该方法来控制reduce任务的工作方式
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context ctx)
throws IOException, InterruptedException
{
int sum = 0;
for ( IntWritable value : values )
{
sum += value.get();
}
result.set( sum );
ctx.write( key, result );
}
}
OutPutFormat接口
定义了如何将数据从Reducer进行输出
RecordWriter<K,V>getRecordWriter
将Reducer的<key,value>写入到目标文件
checkOutputSpecs
判断输出目录是否存在
常用OutputFormat接口实现类
TextOutputFormat
SequenceFileOutputFormat
MapFileOutputFormat
编写M/R Job
//InputFormat
Job job = Job.getInstance(getConf(), "WordCountMR" );
job.setJarByClass( getClass() );
FileInputFormat.addInputPath(job, new Path(args[0]) );
job.setInputFormatClass( TextInputFormat.class );
//OutputFormat
FileOutputFormat.setOutputPath( job, new Path(args[1]) );
job.setOutputFormatClass( TextOutputFormat.class );
//Mapper
job.setMapperClass( WCMapper.class );
job.setMapOutputKeyClass( Text.class );
job.setMapOutputValueClass( IntWritable.class );
//Reducer
job.setReducerClass( WCReducer.class );
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( IntWritable.class );
使用MapReduce实现join操作
map端join
大文件+小文件
reduce端join
传递参数
通过Configuration来传递参数
---------->
分布式缓存机制(Distributed Cache)
在执行MR时下那个集群中的任务节点发送只读文件
分法第三方库(jar等)
分享一些可以装载进内存的文件
进行类似join连接时,小表的分发
符号连接(#)
Job job = new Job();
job.addCacheFile(new Path(filename).toUri());
job.addCacheFile(new URI("/user/data/customer_types.json#customer_type"));
URI[] files = context.getCacheFiles();
Path filename = new Path(files[0])
File customerType = new File("./customer_type");
mapreduce流程
job–>TskTracker–>Map/Reduce Task
resoucrcemanager–>app master–>container–>
scheduler
splite–>map–>combainer–>partitioner–>reducer–>resourcemanager
hadoop1.0和2.0主要区别
1.0管理和计算都是由mapreduce来完成
2.0管理由yarn负责,计算框架可拓展
driver类的核心逻辑
建立job对象–>指定driver类为驱动类–>蛇者mapper和reducer/partitioner
–>设置mapper和reducer的输出类型–>设置输入输出文件路径–>运行
文件关联的逻辑
mapper端负责把文件合并,并且根据业务逻辑,把相同的key归类
reducer端负责根据相同的key来进行关联
MapReduce总览图
Mapperjoin和Reducejoin的区别
Mapjoin是会利用cachefile接入数据,与map端接入的数据进行逻辑关联,不需要写reducer
Reducejoin是map端只完成文件合并,利用相同的关联条件作为key,输出到reduce端,reduce端根据key聚合达到关联的效果
最后
以上就是丰富芝麻为你收集整理的MapReduce原理及编程小结的全部内容,希望文章能够帮你解决MapReduce原理及编程小结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复