概述
1.为什么需要MapReduce
我们为什么不能使用数据库来对大量磁盘上的大规模数据进行批量分析呢?我们为什么需要MapReduce?
这些问题的答案来自磁盘的另一个发展趋势:寻址时间的提高远远慢于传输速率的提高。寻址是将磁头移动到特定磁盘位置进行读写操作的过程。它是导致磁盘操作延迟的主要原因,而传输速度取决于磁盘的带宽。
如果数据的访问模式汇总包含大量的磁盘寻址,那么读取大量数据集所花的时间势必会更长(相较于流式数据读取模式),流式读取主要取决于传输速率。另一方面,如果数据库系统只更新一小部分记录,那么传统的B树更有优势(关系型数据库中使用的一种数据结构,受限于寻址的比例)。但数据库系统更新大部分数据时,B树的效率比MapReduce低的多,因为需要使用“排序/合并”来重建数据库。
1.1.MapReduce与RDBMS的区别
在许多情况下,可以将MapReduce视为关系型数据库管理的补充。两个系统之间的差异如下图:
MapReduce比较适合以批处理的方式处理需要分析整个数据集的问题,尤其是即席分析(实时的分析,响应速度是第一考虑因素)。RDBMS适用于“点查询”和更新,数据集被索引后,数据库系统能够提供低延迟的数据检索和快速的少量数据更新。MapReduce适合一次写入、多次读取数据的应用,而关系型数据库更适合持续更新的数据集。
MapReduce和关系型数据库之间的另一个区别在于他们所操作的数据集的结构化程度。结构化数据是具有既定格式的实体化数据。另一方面,半结构化数据比较松散,虽然可能有格式,但经常被忽略,所以它只能用作对数据结构的一般知道。非结构化数据没有什么特别的内部结构。MapReduce对于非结构化或半结构化数据非常有效,因为在处理数据时才对数据进行解释。换句话说:MapReduce输入的键和值并不是数据固有的属性,而是有分析数据的人员来选择的。
关系型数据往往是规范的,以保持其数据的完整性且不含冗余。规范化给MapReduce带来了问题,因为它使记录读取成为异地操作,然而MapReduce的核心假设之一就是,他可以进行(高速的)流式读写操作。
1.2.MapReduce优势
MapReduce是一种线性可伸缩的编程模式。程序员编写两个函数,分别是Map函数和Reduce函数---每个函数定义一个键/值对集合到另一个键/值对集合的映射。这些数据无需关注数据集及其所用集群的大小,因此可以原封不动的应用到小规模数据集或大规模的数据集上。更重要的是,如果输入的数据量是原来的两倍,那么运行的时间也需要两倍。但是如果集群是原来的两倍,作业的运行仍然与原来的一样快。SQL查询一般不具备该特性。
MapReduce会尽量在计算节点上存储数据,以实现数据的本地快速访问。数据本地化特性是MapReduce的核心特征,并因此而获得良好的性能。意识到网络带宽是数据中心环境最珍贵的资源之后,MapReduce通过显示网络拓扑结构尽力保留网络带宽。注意,这种排列方式并未降低MapReduce的计算密集型的数据分析能力。
MapReduce让程序员无需考虑系统的部分失效问题,因为自身的系统实现能够检测到失败的map或Reduce任务,并让正常运行的机器重新执行这些失败的任务。MapReduce采用无共享框架,可以实现失败检测,这意味着各个任务之间彼此独立。因此,从程序员的角度来看,任务的执行顺序是无关紧要的。
2.认识MapReduce
2.1.MapReduce工作过程
MapReduce作业(job)是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务(task)来执行,其中包括两类任务:map任务和reduce任务。
有两类节点控制着作业执行过程:一个JobTracker以及一系列TaskTracker。JobTracker通过调度taskTracker上运行的任务,来协调所有运行在系统上的作业。TaskTracker在运行任务的同时将运行进度报告发送给JobTracker,JobTracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,JobTracker可以在另外一个TaskTracker节点上重新调度该任务。
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片。Hadoop为每个分片构建一个map任务,并由该任务来运行客户自定义的map函数从而处理分片中的每条记录。
拥有很多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载均衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,处理失败的作业或其他同时运行的作业也能够实现负载均衡,并且如果分片被切分的更细,负载均衡的质量会更好。
另一方面,如果分片切得太小,那么管理分片的总时间和构建map任务的总时间将决定着作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是64MB,不过可以针对集群调整这个默认值,在构建所有文件或新建每个文件时具体指定即可。
Hadoop在存储有输入数据的节点上运行map任务,可以获得最佳性能。这就是所谓的数据本地化优化。现在我们应该清楚为什么最佳分片的大小应该与块的大小相同:因为他是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,这种方法显然效率更低。
Map任务将其输出写入本地磁盘,而非HDFS。这是为什么?因为map的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果可以被删除。因此,如果把他存储在HDFS中并实现备份,难免有些小题大做。如果节点上运行的map任务在将map中间结果传送给reduce任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。
Reduce任务并不具备数据本地化的优势—单个reduce任务的输入通常来自于所有map的输出。因此,排过序的map输出需要通过网络传输发送到运行reduce任务的节点。数据在reduce端合并,然后由用户定义的reduce函数处理。Reduce的输出通常存储在HDFS中以实现可靠存储。
一个reduce任务的完整数据流如下所示。其中虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示节点之间的数据传输。
Reduce任务的数量并非由输入数据的大小决定,而是特别指定的。在后面介绍如何指定reduce任务的数量。如果有多个reduce任务,则每个map任务都会对其输出进行分区,即为每个reduce任务建一个分区。每个分区有许多键(及其对应值),但每个键对应的键/值对记录都在同一分区中。分区由用户定义的分区函数控制,但通常默认的分区器通过哈希函数来分区,这种方法很高效。
一般情况下,多个reduce任务的数据流如下图所示。该图清楚的表明了为什么map任务和reduce任务之间的数据流称为shuffle(混洗),因为每个reduce任务的输入都来自于许多map任务。混洗一般比下图更为复杂,并且调整混洗参数对作业总执行时间会有非常大的影响。
最后,也有可能没有任何reduce任务。当数据处理可以完全并行时,即无需混洗,可能会出现无reduce任务的情况。这种情况下,唯一的本地节点数据传输是map任务将结果写入HDFS。2.2.Combiner—工作流程优化
集群上的可用带宽限制了MapReduce作业的数量,因此最重要的一点是尽量避免map任务和reduce任务之间的数据传输。Hadoop允许用户针对map任务的输出指定一个合并函数(称为combiner)----合并函数的输出作为reduce函数的输入。由于合并函数是一个优化方案,所以Hadoop无法确定针对map任务输出中任一条记录需要调用多少次合并函数(如果需要)换言之,不管调用合并函数多少次,0次、1次或多次,reduce的输出结果都应一致。
举例说明combiner的优化效果,例如:计算最高温度的例子,假设第一个map的输出如下:(1950,0)(1950,20)(1950,10);第二个map的输出如下:(1950,25)(1950,15);则reduce函数被调用时,输入如下:(1950,[0,20,10,25,15]),因为25为该数据中最大的,所以输出如下:(1950,25)。如果我们在每个map任务输出结果后面添加一个combiner函数,如此一来,reduce函数调用时输入数据为:(1950,[20,25]),reduce输出的结果和以前一样。用计算公式表示:max(0,20,10,25,15)=max(max(0,20,10),max(25,15))=max(20,25)=25。这样就可以减少数据在网络中的传输。但是,在MapReduce作业中使用combiner时需要慎重考虑。
2.3.Hadoop的Streaming
Hadoop提供了MapReduce的API,并允许你使用非java的其他语言来编写自己的map和reduce函数。Hadoop的Streaming使用UNIX标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入输出来写MapReduce程序。
Streaming天生适合用于文本处理,在文本模式下使用时,他有一个数据的行视图。Map的输入数据通过标准输入流传递给map函数,并且是一行一行的传输,最后将结果行写到标准输出。Reduce函数的输入格式相同------通过制表符来分割的键/值对-------并通过标准输入流进行传输。Reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。
当前可支持的语言有:Ruby、python版本。
2.4.Hadoop的Pipes
Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming,Pipes使用套接字作为TaskTracker与C++版本map函数和Reduce函数的进程间的通道,而未使用JNI。
3.MapReduce的工作机制
3.1.MapReduce组成
我们可以通过运行一行代码来运行一个MapReduce作业:JobClient.runJob(conf)。这个简短的代码,幕后隐藏着大量的处理细节。先来看一下这中间应用到的MapReduce构成:
- 客户端:提交MapReduce作业;
- JobTracker:协调作业的运行。jobtracker是一个java应用程序,他的主类是JobTracker;
- TaskTracker:运行作业划分后的任务。Tasktracker是java应用程序,它的主类是TaskTracker;
- 分布式文件系统(一般是HDFS),用来在其他实体间共享作业文件。
下面是Hadoop运行MapReduce作业的工作原理图,在后面的章节中,逐一讲解MapReduce的作业过程:
3.2.作业的提交
Jobclient的runjob()方法是用于新建jobclient实例并调用其submitJob()方法的便捷方式。提交作业后,runJob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功,就显示作业计数器。如果失败,导致作业失败的错误被记录到控制台。
JobClient的submitJob()方法所实现的作业提交过程如下:
- 向JobTracker请求一个新的作业ID(通过调用JobTracker的getNewJobId()方法获取);
- 检查作业的输出说明。例如,如果没有指定输出目录或输出目录已经存在,作业就不提交,错误抛回给MapReduce程序。
- 计算作业的输入分片。如果分片无法计算,比如因为输入路径不存在,作业就不提交,错误返回给MapReduce程序。
- 将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下JobTracker的文件系统中。作业JAR的副本较多(由mapred.submit.replication属性控制,默认值为10),因此在运行作业的任务时,集群中有很多个副本可供TaskTracker访问。
- 告知Jobtracker作业准备执行(通过调用JobTracker的submitJob()方法实现)。
3.3.作业的初始化
当JobTracker接收到对其submitJob()方法的调用后,会把此调用放入到一个内部队列中,交由作业调度器进行调度,并对其进行初始化。初始化包括创建一个表示正在运行作业的对象---封装任务和记录信息,以便跟踪任务的状态和进程。
为了创建任务列表,作业调度器首先从共享文件系统中获取jobclient以计算好的输入分片信息。然后为每个分片创建一个map任务。创建的Reduce任务的数据由jobConf的mapred.reduce.task属性决定,他是用setNumReduceTasks()方法来设置的,然后调度器创建相应数量的要运行的Reduce任务。任务在此时指定ID。
3.4.任务的分配
TaskTracker运行一个简单的循环来定期发送“心跳”(heartbeat)给JobTracker。“心跳”告知JobTracker,TaskTracker是否存活,同时也充当两者之间的消息通道。作为“心跳”的一部分,TaskTracker会指明他是否已经准备好运行新的任务,如果是,JobTracker会为它分配一个任务,并使用“心跳”的返回值与TaskTracker进行通信。
在JobTracker为TaskTracker选择任务之前,JobTracker必须选定任务所在的作业。这里有各种调度算法,但是默认的方法是简单维护一个作业优先级列表。一旦选择好作业,JobTracker就可以为该作业选定一个任务。
对于map任务和Reduce任务,TaskTracker有固定数量的任务槽。例如,一个TaskTracker可能可以同时运行两个map任务和两个Reduce任务。准确数量有TaskTracker核的数量和内存大小来决定。默认调度器在处理Reduce任务槽之前,会填满空闲的map任务槽,因此,如果TaskTracker至少有一个空闲的map任务槽,JobTracker会为它选择一个map任务,否则选择一个Reduce任务。
为了选择一个reduce任务,JobTracker简单的从待运行的reduce任务列表中选取下一个来执行,用不着考虑数据的本地化。然而,对于一个map任务,JobTracker会考虑TaskTracker的网络位置,并选择一个距离其输入分片最近的TaskTracker。在理想的情况下,任务是数据本地化的,也就是任务运行在输入分片所在的节点上。同样,任务也可能在机架本地化的:任务和输入分片在同一个机架,但不在同一节点上。一些任务既不是数据本地化的,也不是机架本地化的,而是从他们自身运行的不同机架上检索数据。可以通过查看作业的计数器得知每类任务的比例。
3.5.任务的执行
现在,TaskTracker已经被分配了一个任务,下一步是运行该任务。第一步,通过从共享文件系统把作业的JAR文件复制到TaskTracker所在的文件系统,从而实现作业的JAR文件本地化。同时,TaskTracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。第二步,TaskTracker为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下。第三步,TaskTracker新建一个TaskTracker实例来运行该任务。
taskRunner启动一个新的JVM来运行每个任务,以便用户定义的map和reduce函数的任何软件问题不会影响到TaskTracker(例如导致崩溃或挂起)。但在不同的任务之间重用JVM还是可能的。
子进程通过umbilical接口与父进程进行通信。任务的子进程每隔几秒便告知父进程他的进度,直到任务完成。
Streaming和Pipes
Streaming和Pipes都运行特殊的map和reduce任务,目的是运行用户提供的可执行程序,并与之通信。
在Streaming中,任务使用标准输入和输出Streaming与进程进行通信。另一方面,Pipes任务监听套接字,发送其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建立一个与其父java Pipes任务的持久化套接字连接。
在这两种情况下,在任务执行过程中,java进程会把输入键/值对传给外部的进程,后者通过用户定义的map或reduce函数来执行它并把输出的键值对传回java进程。从TaskTracker的角度看,就像TaskTracker的子进程自己在处理map或reduce代码一样。
下面是执行Streaming和Pipes与TaskTracker及其子进程的关系
3.6.进度和状态更新
MapReduce作业是长时间运行的批量作业,运行时间范围从数秒到数小时。这是一个很长的时间段,所以对于用户而言,能够得知作业进展是很重要的。一个作业和它的每个任务都有一个状态,包括:作业或任务的状态(比如,运行状态,成功完成,失败状态)、map和reduce的进度、作业计数器的值、状态消息或描述。这些状态信息在作业期间不断改变,他们是如何与客户端通信的呢?
任务在运行时,对其进度保持追踪。对map任务,任务进度是已经处理输入所占的比例。对reduce任务,情况稍微有点复杂,但系统仍然会估计已处理reduce输入的比例。整个过程分成三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reducer一半的输入,那么任务的进度便是5/6.因为他已经完成复制和排序阶段(每个占1/3),并且已经完成reduce阶段的一半。
进程并不总是可测量的,但是无论如何,他能告诉Hadoop有个任务正在运行。比如,写输出记录的任务也可以表示成进度,尽管他不能总是需要写百分比这样的数字来表示,因为即使通过任务来产生输出,也无法知道后面的情况。
进度报告很重要,因为这意味着Hadoop不会让正在执行的任务失败。构成进度的所有操作如下:
- 读入一条输入记录(在mapper或reducer中)
- 写入一条输出记录(在mapper或reducer中)
- 在一个reporter中设置状态描述
- 增加计数器(使用reporter的incrCounter方法)
- 调用Reporter的progress()任务。
任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。
如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到TaskTracker。有一个独立的线程每隔三秒检查一次标志,如果已设置,则告诉TaskTracker当前任务状态。同时,TaskTracker每隔五秒发送“心跳”到JobTracker,并且由TaskTracker运行的所有任务的状态都会在调用中被发送至JobTracker。计数器的发送间隔通常少于5秒,因为计数器占用的带宽相对较高。
JobTracker将这些更新合并起来,产生一个表明所有运行作业及其所含任务状态的全局视图。最后,如果前面提到的,jobClient通过每秒查询JobTracker来接受最新状态。客户端也可以使用jobclient的getjob()方法来得到一个runningJob的实例,后者包含作业的所有状态信息。
3.7.作业的完成
当JobTracker接收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成功”然后,在jobclient查询状态时,便知道任务已完成,于是jobclient打印一条消息告知用户,然后从runJob()方法返回。
状态更新在MapReduce系统中的传递流程,如下图:
如果JobTracker有响应的设置,也会发送一个http作业通知。希望收到回调指令的客户端可以通过job.end.notification.url属性来进行这项设置。
最后,JobTracker清空作业的工作状态,只是TaskTracker也清空作业的工作状态。
4.MapReduce作业失败处理
在实际情况下,用户代码存在软件错误,进程会崩溃,机器会产生故障。使用Hadoop最主要的好处之一是他能处理此类故障并完成作业。
4.1.任务失败
首先考虑任务失败的情况,如果map或reduce任务中的用户代码抛出运行异常,子任务JVM进程会在退出之前向其父TaskTracker发送错误报告。错误报告最后被记入用户日志。TaskTracker会将此次task attempt标记为failed,释放一个任务槽运行另外一个任务。
对于Streaming任务,如果Streaming进程以非零代码退出,则被标记为failed。这种行为由stream.non.zero.exit.is.failure属性来控制。
另一种错误情况是子进程JVM突然退出—可能由于JVM bug而导致MapReduce用户代码造成的某些特殊原因造成JVM退出。在这种情况下,TaskTracker会注意到进程已经退出,并将此次尝试标记为failed。
任务挂起的处理方式则有不同。一旦TaskTracker注意到已经有一段时间没有收到进度的更新,便会将任务标记为failed。在此之后,JVM子进程将被自动杀死。任务失败的超时间隔通常为10分钟,可以以作业为基础(或以集群为基础将mapred.task.timeout属性设置为以毫秒为单位的值)
如果超时(timeout)设置为0将关闭超时判定,所以长时间运行的任务永远不会被标记为failed。在这种情况下,被挂起的任务永远不会释放他的任务槽,并随着时间的推移最终降低整个集群的效率。因此,尽量避免这种设置,同时充分确保每个任务能够定期汇报进度。
JobTracker知道一个task attempt失败后(通过TaskTracker的“心跳”调用),它将重新调度该任务的执行。JobTracker会尝试重新调度失败过的TaskTracker上的任务。此外,如果一个任务失败次数超过4次,它将不会再被重试。这个值是可以设置的:对于map任务,运行任务的最多尝试数由mapred.map.max.attempts属性控制;而对于reduce任务,则由mapred.reduce.max.attempts属性控制。在默认情况下,如果有任何任务失败次数大于4,整个作业都会失败。
对于一些应用程序,我们不希望一旦有少数几个任务失败就终止运行整个作业,因为即使有任务失败,作业的一些结果可能还是有可用的。在这种情况下,可以为作业设置在不触发作业失败的情况下允许任务失败的最大百分比。Map任务和reduce任务可以独立控制,分别通过mapred.max.map.failures.percent和mapred.max.reduce.failures.percent属性来设置。
任务尝试(task attempt)也是可以终止的(killed),这与失败不同。Task attempt可以中止是因为他是一个推测副本,或因为他所处的TaskTracker失败,导致JobTracker将它上面运行的所有task attempt标记为killed。被中止的task attempt不会被记入任务运行尝试次数,因为尝试中止并不是任务的错。
用户也可以使用web UI或命令行方式来中止或取消task attempt。作业可以采用相同的机制来中止。
4.2.TaskTracker失败
TaskTracker失败是另一种失败模式。如果一个TaskTracker由于崩溃或运行过于缓慢而失败,它将停止向JobTracker发送“心跳”。JobTracker会注意到已经停止发送“心跳”的TaskTracker(假设他有10分钟没有收到一个“心跳”。这个值由mapred.tasktracker.expiry.interval属性来设置,以毫秒为单位),并将它从等待任务调度的TaskTracker池中移除。如果是未完成的作业,JobTracker会安排此TaskTracker上已运行的map任务重新运行,因为reduce任务无法访问。他们的中间输入(都存放在失败的TaskTracker的本地文件系统上)。任何进行中的任务也都会被重新调度。
即使TaskTracker没有失败,也可能被JobTracker列入黑名单。如果TaskTracker上面的失败任务数远远高于集群的平均失败任务数,他就会被列入黑名单。被列入黑名单的TaskTracker可以通过重启从JobTracker的黑名单中移出。
4.3.JobTracker失败
JobTracker失败在所有失败中是最为严重的一种。较早版本的Hadoop没有处理JobTracker失败的机制—他是一个单点故障—因此在这种情况下,作业注定失败。然而,这种失败发生的概率很小,因为具体某台机器失败的几率很小。当前Hadoop可以通过Hadoop HA的配置解决这个故障。
5.作业的调度
早期版本的Hadoop使用一种非常简单的方法来调度用户的作业:按照作业提交的顺序,使用FIFO调度算法运行作业。典型情况下,每个作业都会使用整个集群,因此作业必须等待直到轮到自己运行。虽然共享集群极有可能为多用户提供大量资源,但问题在于如何公平的在用户之间分配资源,这需要一个更好的调度器。
随后,加入设置作业优先级的功能,可以通过设置mapred.job.priority属性或jobClient的setJobPriority()方法来设置优先级。作业调度器选择要运行的下一个作业时,他选择的是优先级最高的那个作业。然而,在FIFO调度算法中,优先级并不支持抢占,所以高优先级的作业仍然会被那些在高优先级作业被调度之前开始运行的、长时间运行的低优先级的作业所阻塞。
在Hadoop中,MapReduce的调度器可以选择。默认的调度器是原始的基于队列的FIFO调度器,还有两个用户调度器,分别名为Fair Scheduler和Capacity Scheduler。
5.1.Fair Scheduler(facebook)
Fair Scheduler的目标是让每一个用户公平地共享集群能力。如果只有一个作业在运行,他会得到集群的所有资源。随着提交的作业越来越多,空闲的任务槽会以“让每个用户公平共享集群”这种方式进行分配。某个用户的一个短的作业将在合理的时间内完成,即便另一个用户的长时间作业正在运行而且还在运行过程中。
作业都被放在作业池中,在默认情况下,每个用户都有自己的作业池。提交作业数超过另一个用户的用户,不会因此而比后者获得更多集群资源。可以用map和reduce的任务槽数来定制作业池的最小容量,也可以设置每个池的权重。
Fair Scheduler支持抢占,所以,如果一个池在特定的一段时间内未得到公平的资源共享,他会中止运行池中得到过多资源的任务,以便把任务槽让给运行资源不足的池。
Fair Scheduler是一个后续模块。要使用它,需要将其JAR文件放在Hadoop的类路径,即将它从Hadoop的contrib/fairsheduler目录复制到lib目录。随后,像下面这样配置mapred.jobtracker.taskScheduler属性:org.apache.hadoop.mapred.FairScheduler。
1、总结来说,公平调度主要有一下特点:
①支持多用户多队列;②资源公平共享(公平共享量由优先级决定);③保证最小共享量;④支持抢占资源;⑤限制作业并发量,以防止中间数据塞满磁盘;⑥添加delay Scheduler机制,使调度策略更优;⑦每个队列的调度策略(提交到队列的job)可以配置,支持两种调度策略,分别是FIFO(按照job优先级高低调度,job优先级越高,则会优先调度运行)和FAIR(公平调度算法,job同时运行,但是会按照job的优先级分配资源,job的优先级越高分配到的资源越多)。
2、先了解几个概念:
①Pool:资源池,或者作业池。每个pool里有一定量的资源(管理员配置),每个用户属于某个pool,其作业可使用这个pool中的资源,可限定每个pool中最大并发作业数和每个用户最多提供作业数。默认情况下,一个linux用户对应一个pool,而管理员也可以配置一个linux group对应一个pool。Pool实际上也可以称为group或者队列。
②最小共享量:管理员可给每个pool配置一个最小共享量,调度器在分配资源时,需要保证每个pool中的作业至少获取该数目的资源。一个常见的应用场景是,对产品pool设置最小共享量,而测试pool不设置。这样,当可用资源有限时,优先保证产品pool有资源可用。
③公平共享量:当集群中存在多个pool时,某些pool中的资源可能用不了,这时候调度器会自动将这些pool中剩余的资源共享给其他需要的pool,其他这些pool获取的共享资源多少主要由其pool weight决定,pool weight越大,获取的资源越多。一个pool的最小共享量加上其获取的共享资源数目,就是公平共享量。
3、资源抢占
当一定时间(管理员配置)内,某个pool中获取的资源量少于最小共享量,或者公平共享量的一半,则调度器会找出哪个pool抢占了该pool的资源,并杀死相应数量的task以抢占资源。之所以要进行抢占,还是为了“公平”,即:保证每个pool能获取到它应得到的资源。
4、delay Scheduler机制
当出现空闲slot时,如果排在队列前面的job对应的所有task均没有locality属性(数据的本地性),则该作业会延时调度,直到一段时间后(有新的slot被释放),该job出现locality的task或者发生超时,才不得不调度该job的task。
什么是locality属性?当出现空闲slot时,该slot来自某个节点,而该节点上存有部分数据,如果某个task所需要的数据正好位于该节点上,则将该slot分配给该task是非常好的,因为它避免了通过网络读取数据。
5、公平共享计算方法
公平共享量是基于最小共享量和共享资源量计算得到的,它反映的是某个pool经过资源共享(某些pool的资源用不了,会自动共享给其他pool)之后,一共可以获取的资源总量,一般会大于等于最小共享量。
如果每个pool没有配置最小共享量,且提交了无限量的作业,则让每个pool的slotsAssinged/weight值相同即可。(其中slotsAssgined表示分配给pool的slot数,weight表示pool的权重)。
而有了最小共享minShare和pool中的需求量demand后,计算公平共享量FairShare需注意一下两种情况:
- 某些pool中的最小共享量可能用不完;
- 某些pool的资源需求量大于其最小共享量;
考虑到以上两种情况,调度器设计了基于比率R的公平资源分配方法(设集群中资源总量为totalSlots):
- 如果一个pool中的demand<R*weight,则该pool的fairShare=demand;
- 如果一个pool中的minShare>R*weight,则该pool的fairShare=mindshare;
- 除此之外,所有pool的fairShare=R*weight;
- 所有pool的fairShare之和应为totalSlots;
通过以上算法计算的公平共享量即为“公平调度器”的“公平”含义之所在,应尽量保证每个pool获取的资源量为fairShare,如果一定时间限制内达不到,则抢占资源。通过图表表示算法如下:
注意:比率R表示weight-to-slot,即weight与slot的映射关系,其计算方法采用了二分枚举算法,具体实现请查看源代码。R需要取得一个合适的值,使得上面的条件【4】成立。
6、公平调度器缺点
当前公平调度器不支持大内存作业,而capacity Scheduler则早支持了,其原理是:如果一个job需要较大内存,调度器会为该job分配多个slot,这样,作业可使用这些slot对应的内存资源。
7、配置实例
配置步骤为:
1.将$HADOOP_HOME/contrib/fairscheduler/hadoop-fairscheduler-0.20.2-cdh3u3.jar拷贝到$HADOOP_HOME/lib文件夹中
2.修改$HADOOP_HOME/conf/mapred-site.xml配置文件
##配置MapReduce的资源调度方式 <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.apache.hadoop.mapred.FairScheduler</value> </property> ##配置加载公平调度的配置文件 <property> <name>mapred.fairscheduler.allocation.file</name> <value>/home/hadoop/hadoop-0.20.2-cdh3u3/conf/fair-scheduler.xml</value> </property> ##开启公平调度的资源抢占 <property> <name>mapred.fairscheduler.preemption</name> <value>true</value> </property> ##开启公平调度的多模式 <property> <name>mapred.fairscheduler.assignmultiple</name> <value>true</value> </property> <property> <name>mapred.fairscheduler.poolnameproperty</name> <value>mapred.queue.name</value> <description>job.set("mapred.queue.name",pool); // pool is set to either 'high' or 'low' </description> </property> <property> <name>mapred.fairscheduler.preemption.only.log</name> <value>true</value> </property> <property> <name>mapred.fairscheduler.preemption.interval</name> <value>15000</value> </property> <property> <name>mapred.queue.names</name> <value>default,hadoop,hive</value> </property>
3.在$HADOOP_HOME/conf/新建配置文件fair-scheduler.xml
<?xml version="1.0"?> <allocations> <pool name="hive"> <minMaps>90</minMaps> <minReduces>20</minReduces> <maxRunningJobs>20</maxRunningJobs> <weight>2.0</weight> <minSharePreemptionTimeout>30</minSharePreemptionTimeout> </pool> <pool name="hadoop"> <minMaps>9</minMaps> <minReduces>2</minReduces> <maxRunningJobs>20</maxRunningJobs> <weight>1.0</weight> <minSharePreemptionTimeout>30</minSharePreemptionTimeout> </pool> <user name="hadoop"> <maxRunningJobs>6</maxRunningJobs> </user> <poolMaxJobsDefault>10</poolMaxJobsDefault> <userMaxJobsDefault>8</userMaxJobsDefault> <defaultMinSharePreemptionTimeout>600</defaultMinSharePreemptionTimeout> <fairSharePreemptionTimeout>600</fairSharePreemptionTimeout> </allocations>
4.在集群的各个节点执行以上步骤,然后重启集群,在http://namenode:50030/scheduler 即可查看到调度器运行状态,如果修改调度器配置的话,只需要修改文件fair-scheduler.xml ,不需重启配置即可生效。
5.在执行hive任务时,设置hive属于的队列set mapred.job.queue.name=hive;
另外,如果在执行MR JOB的时候出现XX用户访问不了YY队列的话,就需要在mapred-queue-acls.xml里配置相应的属性,来对访问权限进行控制,比如:
<property> <name>mapred.queue.default.acl-submit-job</name> <value>*</value> <description> Comma separated list of user and group names that are allowed to submit jobs to the 'default' queue. The user list and the group list are separated by a blank. For e.g. user1,user2 group1,group2. If set to the special value '*', it means all users are allowed to submit jobs. If set to ' '(i.e. space), no user will be allowed to submit jobs. It is only used if authorization is enabled in Map/Reduce by setting the configuration property mapred.acls.enabled to true. Irrespective of this ACL configuration, the user who started the cluster and cluster administrators configured via mapreduce.cluster.administrators can submit jobs. </description> </property> <property> <name>mapred.queue.default.acl-administer-jobs</name> <value>*</value> <description> Comma separated list of user and group names that are allowed to view job details, kill jobs or modify job's priority for all the jobs in the 'default' queue. The user list and the group list are separated by a blank. For e.g. user1,user2 group1,group2. If set to the special value '*', it means all users are allowed to do this operation. If set to ' '(i.e. space), no user will be allowed to do this operation. It is only used if authorization is enabled in Map/Reduce by setting the configuration property mapred.acls.enabled to true. Irrespective of this ACL configuration, the user who started the cluster and cluster administrators configured via mapreduce.cluster.administrators can do the above operations on all the jobs in all the queues. The job owner can do all the above operations on his/her job irrespective of this ACL configuration. </description> </property>
5.2.Capacity Scheduler(Yahoo)
针对多用户调度,Capacity Scheduler采用的方法稍有不同。集群由很多队列组成(类似于Fair Scheduler的任务池,这些队列可能是层次结构的(因此,一个队列可能是另一个队列的孩子),每个队列有一个分配能力。这一点与Fair Scheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上,Capacity Scheduler允许用户或组织(使用队列进行定义)为每个用户或组织模拟一个独立的使用FIFO Scheduler的MapReduce集群。相比之下,Fair Scheduler强制每个池内公平共享,使运行的作业共享池的资源。
1、能力(容量)调度器特性:
- 计算能力保证。支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业共享该队列中的资源。
- 灵活性。空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源,便会分配给他们。
- 支持优先级。队列支持作业优先级调度(默认是FIFO)。
- 多重租赁。综合考虑多重约束防止单个作业、用户或者队列独占队列或者集群中的资源。
- 基于资源的调度。支持资源密集作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。
2、能力调度器算法分析
涉及到的变量:在capacity中,存在三种粒度的对象,分别是:queue、job和task,他们均需要维护的一些信息:
①Queue维护的信息:
- QueueName:queue名称;
- UlMin:每个用户的可用的最少资源量(所有用户均相同),需用户在配置文件中指定;
- CapacityPercent:计算资源比例,需用户在配置文件中指定;
- NumJobsByUser:每个用户的作业量,用以跟踪每个用户提交的作业量,并进行数量的上限限制;
该队列中map或reduce task的属性:
- Capacity:实际的计算资源量,这个随着TaskTracker中slot数目变化(用户可能在添加或减少机器节点)而动态变化,大小为:capacityPercent*mapClusterCapacity/100;
- numRunningTasks:正在running的task数目;
- numSlotsOccupied:正在running的task占用的slot总数,注意,在capacity Scheduler中,running task与slot不一定是一一对应的,每个task可获取多个slot,这主要是因为该调度支持内存资源调度,某个task可能需要多个slot包含的内存量。
- numSlotsOccupiedByUser:每个用户的作业占用slot总数,用以限制用户使用的资源量。
②job维护的信息
- priority:作业优先级,分为五个等级,从大到小依次为:very_high,high,normal,low,very_low;
- numMapTasks/numReduceTasks:job的map/reduce task总数;
- runningMapTasks/runningReduceTasks:job正在运行的map/reduce task数量
- finishMapTasks/finishedReduceTasks:job已完成的map/reduce task数
③task维护的信息
- task开始运行时间,当前状态等。
3、能力调度算法
当某个TaskTracker上出现空闲slot时,调度器依次选择一个queue、(选中的queue中的)job、(选中的job中的)task,并将该slot分配给该task。下面是选择queue、job和task所采用的策略:
①选择queue:当所有queue按照资源使用率(numSlotsOccupied/capacity)由小到大排序,依次进行处理,直到找到一个合适的job。
②选择job:在当前queue中,所有作业按照作业提交时间和作业优先级进行排序(假设开启支持优先级调度功能,默认不支持,需要在配置文件中开启),调度依次考虑每个作业,选择符合两个条件的job:【1】作业所在的用户未达到资源使用上限【2】该TaskTracker所在的节点剩余的内存足够该job的task使用。
③选择task,同大部分调度器一样,考虑task的locality和资源使用情况(即:调用jobInProgress中的obtainNewMapTask()/obtainNewReduceTask()方法)
综上所述,能力调度器的伪代码为:
// CapacityTaskScheduler:trackTracker出现空闲slot,为slot寻找合适的task
List<Task> assignTasks(TaskTrackerStatus taskTracker) {
sortQueuesByResourcesUsesage(queues);
for queue:queues {
sortJobsByTimeAndPriority(queue);
for job:queue.getJobs() {
if(matchesMemoryRequirements(job,taskTracker)) {
task = job. obtainNewTask();
if(task != null) return task
}
}
}
}
4、capacity Scheduler配置实例
①. 复制$HADOOP_HOME/contrib/capacity-scheduler/hadoop-capacity-scheduler.jar 到$HADOOP_HOME/lib目录中
②. 修改namenode节点中的conf/mapred-site.xml文件
<property> <name>mapred.jobtracker.taskScheduler</name> <value>org.apache.hadoop.mapred.CapacityTaskScheduler</value> </property> <property> <name>mapred.queue.names</name> <value>default,hadoop,hive</value> </property>
③. 修改conf/capacity-scheduler.xml 配置文件
<?xml version="1.0"?> <!-- This is the configuration file for the resource manager in Hadoop. --> <!-- You can configure various scheduling parameters related to queues. --> <!-- The properties for a queue follow a naming convention,such as, --> <!-- mapred.capacity-scheduler.queue.<queue-name>.property-name. --> <configuration> <!-- Capacity scheduler Job Initialization configuration parameters --> <property> <name>mapred.capacity-scheduler.init-poll-interval</name> <value>5000</value> <description>The amount of time in miliseconds which is used to poll the job queues for jobs to initialize. </description> </property> <property> <name>mapred.capacity-scheduler.init-worker-threads</name> <value>5</value> <description>Number of worker threads which would be used by Initialization poller to initialize jobs in a set of queue. If number mentioned in property is equal to number of job queues then a single thread would initialize jobs in a queue. If lesser then a thread would get a set of queues assigned. If the number is greater then number of threads would be equal to number of job queues. </description> </property> <property> <name>mapred.capacity-scheduler.maximum-system-jobs</name> <value>30</value> <description>Maximum number of jobs in the system which can be initialized, concurrently, by the Capacity Scheduler. </description> </property> <!--hadoop queue--> <property> <name>mapred.capacity-scheduler.queue.hadoop.capacity</name> <value>30</value> <description>Percentage of the number of slots in the cluster that are to be available for jobs in this queue. </description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.maximum-capacity</name> <value>-1</value> <description> </description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.supports-priority</name> <value>true</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.minimum-user-limit-percent</name> <value>100</value> <description> </description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.user-limit-factor</name> <value>3</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.maximum-initialized-active-tasks</name> <value>200000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.maximum-initialized-active-tasks-per-user</name> <value>100000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hadoop.init-accept-jobs-factor</name> <value>10</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user</name> <value>5</value> <description>The maximum number of jobs to be pre-initialized for a user of the job queue. </description> </property> <!-- hive --> <property> <name>mapred.capacity-scheduler.queue.hive.capacity</name> <value>30</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.maximum-capacity</name> <value>-1</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.supports-priority</name> <value>true</value> <description>If true, priorities of jobs will be taken into account in scheduling decisions. </description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.minimum-user-limit-percent</name> <value>100</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.user-limit-factor</name> <value>4</value> <description>The multiple of the queue capacity which can be configured to allow a single user to acquire more slots. </description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.maximum-initialized-active-tasks</name> <value>200000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.maximum-initialized-active-tasks-per-user</name> <value>100000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.hive.init-accept-jobs-factor</name> <value>10</value> <description></description> </property> <!-- default --> <property> <name>mapred.capacity-scheduler.queue.default.capacity</name> <value>40</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.maximum-capacity</name> <value>-1</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.supports-priority</name> <value>true</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name> <value>100</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.user-limit-factor</name> <value>4</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.maximum-initialized-active-tasks</name> <value>200000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.maximum-initialized-active-tasks-per-user</name> <value>100000</value> <description></description> </property> <property> <name>mapred.capacity-scheduler.queue.default.init-accept-jobs-factor</name> <value>10</value> <description></description> </property> </configuration>
保存文件后,重启jobtracker
以后修改capacity-scheduler.xml文件后只需要执行命令hadoop mradmin -refreshQueues 就可以重新加载配置项。
④最后如何使用该队列呢:
- Mapreduce:在Job的代码中,设置Job属于的队列,例如hive:conf.setQueueName("hive");
- Hive:在执行hive任务时,设置hive属于的队列,例如
hive:set mapred.job.queue.name=hive;
- 设置队列的任务名称set mapred.job.name=hadooptest;
- 设置队列的优先级别set mapred.job.priority=HIGH;
5.3.能力调度器与公平调度器对比
1、相同点
- 均支持多用户多队列,即:适用于多用户共享集群的应用环境
- 单个队列均支持优先级和FIFO调度方式
- 均支持资源共享,即某个queue中的资源有剩余时,可共享给其他缺资源的queue
2、不同点
- 核心调度策略不同。计算能力调度器的调度策略是,先选择资源利用率低的queue,然后在queue中同时考虑FIFO和memory constraint因素;而公平调度器仅考虑公平,而公平是通过池的权重体现的,调度器根据pool权重公平的分配资源。
- 内存约束。计算能力调度器调度job时会考虑作业的内存限制,为了满足某些特殊job的特殊内存需求,可能会为该job分配多个slot;而公平调度器对这种特殊的job无能为力,只能杀掉这种task。
6.Mapreduce的shuffle和排序
MapReduce确保每个reduce的输入都按键排序。系统执行排序的过程—将map输出作为输入传给reducer—称为shuffle。Shuffle是MapReduce的“心脏”,是奇迹发生的地方。
6.1.Map端
Map函数开始产生输出时,并不是简单的将它写到磁盘。这个过程更为复杂,它利用缓冲的方式写到内存,并出于效率的考虑进行预排序。如下图:
每个map任务都有一个环形内存缓冲区,用于存储任务的输出。默认情况下,缓冲区的大小为100MB,此值可以通过改变io.sort.mb属性来调整。一旦缓冲内容达到阀值(io.sort.spill.percent,默认为0.8),一个后台线程便开始把内容写到(spill)磁盘中。在写磁盘过程中,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到写磁盘过程完成。
写磁盘将按轮询方式写到mapred.local.dir属性指定的作业特定子目录中的目录中。
在写磁盘之前,线程首先根据数据最终要传送到的reducer把数据划分成相应的分区(partition)。在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上进行。
一旦内存缓冲区达到溢出的阀值,就会新建一个溢出写文件,因此在map任务写完其最后一个输出记录之后,会有几个溢出写文件。在任务完成之前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认值是10。
如果已经指定combiner,并且溢出写次数至少为3(min.num.spills.for.combine属性的取值)时,则combiner就会在输出文件写到磁盘之前进行。前面曾讲过,combiner可以在输入上反复运行,但并不影响最终结果。运行combiner的意义在于使map输出更紧凑,使得写到本地磁盘和传给reducer的数据更少。
写磁盘时压缩map输出往往是个好主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并减少传给reducer的数据量。默认情况下,输出是不压缩的,但只要将mapred.compress.map.output设置为true,就可以轻松启动该功能。使用的压缩库由mapred.map.output.compression.codec指定。
Reducer通过http方式得到输出文件的分区。用于文件分区的工作线程的数量由任务的tracker.http.threads属性控制,此设置针对每个TaskTracker,而不是针对每个map任务槽。默认值是40,在运行大型作业的大型集群上,此值可以根据需要而增加。
6.2.Reduce端
Map输出文件位于运行map任务的TaskTracker的本地磁盘,现在TaskTracker需要为分区文件运行reduce任务。更进一步,reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只需要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段(copy phase)。Reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以通过设置mapred.reduce.parallel.copies属性来改变。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Reducer如何知道要从那个TaskTracker取得map输出呢?
Map任务成功完成后,他们会通知父TaskTracker状态更新,然后TaskTracker进而通知JobTracker,这些通知在前面介绍的心跳通讯机制中传输。因此,对于指定作业,JobTracker知道map输出和TaskTracker之间的映射关系。Reducer中的一个线程定期询问JobTracker以获取map输出的位置,直到他获得所有输出位置。
由于reducer可能失败,因此TaskTracker并没有在第一个reducer检索到map输出时就立刻从磁盘上删除它们。相反,TaskTracker会等待,直到JobTracker告知它们可以删除map输出,这是作业完成后执行的。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
如果map输出相当小,则会被复制到reduce TaskTracker的内存,否则,map输出被复制到磁盘。一旦内存缓冲区达到阀值大小(由mapred.iob.shuffle.merge.percent决定)或达到map输出阀值(由mapred.inmem.merge.threshold控制),则合并后溢出写到磁盘中。
随着磁盘复本的增多,后台线程会将他们合并为更大的、排好序的文件。这会为后面的合并节省一些空间。注意,为了合并,压缩的map输出都必须在内存中被解压缩。
复制完所有map输出,reduce任务进入排序阶段,这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10(10为默认设置,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。
在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的的合并即可来自内存和磁盘片段。
在reduce阶段,对已排序输出中的每个键都调用reduce函数。该阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于TaskTracker节点也运行数据节点,所以第一个块复本将被写到本地磁盘。
最后
以上就是繁荣冬日为你收集整理的MapReduce详解1.为什么需要MapReduce 2.认识MapReduce 3.MapReduce的工作机制 4.MapReduce作业失败处理 5.作业的调度 6.Mapreduce的shuffle和排序 的全部内容,希望文章能够帮你解决MapReduce详解1.为什么需要MapReduce 2.认识MapReduce 3.MapReduce的工作机制 4.MapReduce作业失败处理 5.作业的调度 6.Mapreduce的shuffle和排序 所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复