我是靠谱客的博主 阔达康乃馨,最近开发中收集的这篇文章主要介绍map+shuffle+reducer,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

map+shuffle+reducer
1. MAPREDUCE入门 4
1.1 为什么要MAPREDUCE 4
1.2 MAPREDUCE程序运行演示 4
1.3 MAPREDUCE 示例编写及编程规范 4
1.3.1 编程规范 4
1.3.2 wordcount示例编写 5
1.4 MAPREDUCE程序运行模式及debug方法 7
1.4.1 本地运行模式 7
1.4.2 集群运行模式 7
2. Mapreduce程序的核心运行机制 8
2.1 概述 8
2.2 mr程序运行流程 8
2.2.1 流程示意图 8
2.2.2 流程解析 8
2.3 Maptask实例数的决定机制 10
2.3.1 maptask数量的决定机制 10
2.3.2切片机制: 10
2.4 ReduceTask实例数的决定 11
3. MAPREDUCE中的Combiner 12
4. MAPREDUCE中的序列化 12
4.1 概述 12
4.2 Jdk序列化和MR序列化之间的比较 12
4.3 自定义对象实现MR中的序列化接口 13
5. Mapreduce中的排序初步 16
5.1 需求: 16
5.2 分析 16
5.3 实现 16
6. Mapreduce中的分区Partitioner 20
6.1 需求: 20
6.2 分析 20
6.3 实现 20
7. mapreduce的shuffle机制 22
7.1 概述: 22
7.2 主要流程: 22
7.3 详细流程 22
7.4 详细流程示意图 23
8. mapreduce数据压缩 24
8.1 概述 24
8.2 MR支持的压缩编码 24
8.3 Reducer输出压缩 24
8.4 Mapper输出压缩 25
8.5 压缩文件的读取 25
9. MapReduce与YARN 27
9.1 YARN概述 27
9.2 YARN的重要概念 27
9.3 Yarn中运行运算程序的示例 27
10. MapReduce编程案例 28
10.1 reduce端join算法实现 28
10.2 map端join算法实现 29
10.3 web日志预处理 32
附:Mapreduce参数优化 36
11.1 资源相关参数 36
11.2 容错相关参数 37
11.3 本地运行mapreduce 作业 37
11.4 效率和稳定性相关参数 37


课程大纲(MAPREDUCE详解)
MapReduce快速入门 如何理解map、reduce计算模型
Mapreudce程序运行演示
Mapreduce编程规范及示例编写
Mapreduce程序运行模式及debug方法
MapReduce高级特性 Mapreduce程序的核心机制
MapReduce的序列化框架
MapReduce的排序实现
MapReduce的分区机制及自定义
Mapreduce的数据压缩
Mapreduce与yarn的结合
Mapreduce编程案例
Mapreduce 参数优化

目标:
掌握mapreduce分布式运算框架的编程思想
掌握mapreduce常用算法的编程套路
掌握mapreduce分布式运算框架的运行机制,具备一定自定义开发的能力


1. MAPREDUCE原理篇(1)
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

1.1 为什么要MAPREDUCE
(1)海量数据在单机上处理因为硬件资源限制,无法胜任
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

设想一个海量数据场景下的wordcount需求:
单机版:内存受限,磁盘受限,运算能力受限
分布式:
1、文件分布式存储(HDFS)
2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)
3、运算程序如何分发
4、程序如何分配运算任务(切片)
5、两阶段的程序如何启动?如何协调?
6、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask

1.2 MAPREDUCE框架结构及核心运行机制
1.2.1 结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程

1.2.2 MR程序运行流程
1.2.2.1 流程示意图

1.2.2.2 流程解析
1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2、 maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对
b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

3、 MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4、 Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储


1.3 MapTask并行度决定机制
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度
那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

1.3.1 mapTask并行度的决定机制
一个job的map阶段并行度由客户端在提交job时决定
而客户端对map阶段并行度的规划的基本逻辑为:
将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:

1.3.2 FileInputFormat切片机制
1、切片定义在InputFormat类中的getSplit()方法
2、FileInputFormat中默认的切片机制:
a) 简单地按照文件的内容长度进行切片
b) 切片大小,默认等于block大小
c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1– 0~128
file1.txt.split2– 128~256
file1.txt.split3– 256~320
file2.txt.split1– 0~10M

3、FileInputFormat中切片的大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默认情况下,切片大小=blocksize
maxsize(切片最大值):
参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值
minsize (切片最小值):
参数调的比blockSize大,则可以让切片变得比blocksize还大

选择并发数的影响因素:
1、运算节点的硬件配置
2、运算任务的类型:CPU密集型还是IO密集型
3、运算任务的数据量
1.4 map并行度的经验之谈
如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。
 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。
配置task的JVM重用可以改善该问题:
(mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task
数目(属于同一个Job)是1。也就是说一个task启一个JVM)

 如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB

1.5 ReduceTask并行度的决定
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

//默认值是1,手动设置为4
job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask

尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

1.6 MAPREDUCE程序运行演示
Hadoop的发布包中内置了一个hadoop-mapreduce-example-2.4.1.jar,这个jar包中有各种MR示例程序,可以通过以下步骤运行:
启动hdfs,yarn
然后在集群中的任意一台服务器上启动执行程序(比如运行wordcount):
hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out
2. MAPREDUCE实践篇(1)
2.1 MAPREDUCE 示例编写及编程规范
2.1.1 编程规范
(1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(4)Mapper中的业务逻辑写在map()方法中
(5)map()方法(maptask进程)对每一个

最后

以上就是阔达康乃馨为你收集整理的map+shuffle+reducer的全部内容,希望文章能够帮你解决map+shuffle+reducer所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部