概述
下面是rdd的论文中,从hdfs读取日志数据的一个例子:
|
lines = spark.textFile( "hdfs://..." ) // lines is a org.apache.spark.rdd.MappedRDD
errors = lines.filter(_.startsWith( "ERROR" )) // errors is a org.apache.spark.rdd.FilteredRDD
errors.cache() // persist 到内存中
errors.count() // 触发action,计算errors有多少个,即ERROR的多少行
// Count errors mentioning MySQL:
errors.filter(_.contains( "MySQL" )).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains( "HDFS" ))
.map(_.split( 't' )( 3 ))
.collect()
spark是一个org.apache.spark.SparkContext的实例,spark的应用从定义一个SparkContext开始:
textFile的定义如下:
hadoopFile创建了一个org.apache.spark.rdd.HadoopRDD, 而在HadoopRDD上调用map则生成了一个MappedRDD:
errors.cache()并不会立即执行,它的作用是在RDD的计算完成后,将结果cache起来,以供以后的计算使用, 这样的话可以加快以后运算的速度。 errors.count() 就触发了一个action,这个时候就需要向集群提交job了:
提交后,SparkContext会将runJob提交到DAGScheduler,DAGScheduler会将当前的DAG划分成Stage, 然后生成TaskSet后通过TaskScheduler的submitTasks提交tasks,而这又会调用SchedulerBackend, SchedulerBackend会将这些任务发送到Executor去执行。 |
最后
以上就是现实烧鹅为你收集整理的spark任务中基于rdd的执行流程分析的全部内容,希望文章能够帮你解决spark任务中基于rdd的执行流程分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复