我是靠谱客的博主 知性凉面,最近开发中收集的这篇文章主要介绍Spark(1)-笔记整理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

SPARK
【什么是Spark】
Spark是一种快速、通用、可扩展的大数据分析引擎目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQLSpark StreamingGraphXMLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
【Spark特点】
1.与Hadoop相比速度快
HadoopMapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
2.易用
Spark支持JavaPythonScalaAPI,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的PythonScalashell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
3.通用
Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX
4.兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用HadoopYARNApache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFSHBaseCassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署StandaloneSpark集群的工具。
【Spark集群安装】
****
【spark集群运行jar】
spark-submit
--class cn.st.test.TestJDBC //主函数类的全路径
--master spark://localhost:7077
--executor-memory 1G
--total-executor-cores 2 jarpath //jarpath ,jar包所在服务器路径
arg1 arg2 //主函数可能需要参数
【Spark-Shell】
1.启动
spark-shell --master spark://localhost:7077 --executor-memory 2g --total-executor-cores 12
2.http://localhost:8080/ //spark管理界面
 
【RDD的计算】
RDD的计算实际上我们可以分为两大部分。
1)Driver端的计算
主要是stage划分,task的封装,task调度执行
2)Executor端的计算
真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。
【RDD特点:】
1.一系列分区
2.每一个输入切片会有一个函数作用在上面
3.RDD和RDD之间存在依赖关系
4.RDD中如果存储的是K-V,shuffle时会有一个分区器,模式hashpartitioner
5.如果是读取HDFS中的数据,那么会有一个最优位置。
【SPARK任务执行流程】
1.构建DAG(调用RDD上的方法)
2.DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage生成的Task以TaskSet的形式给TaskScheduler
3.TaskScheduler调度Task(根据资源情况将Task调度到相应的Executor中)
4.Excutor接收Task,然后将Task丢入到线程池中执行
DAG:有向无环图(数据执行过程,有方向,无闭环)
DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程)
DAG是有边界的:开始(通过SparkContext创建的RDD),结束(触发Action,调用Run Job就是一个完整的DAG形成了)
一个RDD知识描述了数据计算过程中的一个环节,而DAG是由一到多个RDD组成,描述了数据计算过程中的所有环节(过程)
一个Spark Application 中是有多少个DAG:一到多个(取决于触发了多少次Action)
--------------------------------------------------------------------------------------------------------------------
一个DAG中可能有产生多种不同类型和功能的Task,会有不同阶段
DAGScheduler:将一个DAG切分成一到多个Stage,DAGScheduler切分的依据是Shuffle (宽依赖)
为什么要切分Stage?
一个复杂的业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)
如果有shuffle,那么久意味着前面阶段产生的结果后,才能执行下一个阶段,下一个阶段的计算要依赖上一个阶段的数据。
在同一个Stage中,会有多个算子,可以合并在一起,我们称其为pipeline(流水线:严格按照流程,顺序执行)
【Shuffle的定义】
Shuffle的含义是洗牌,将数据打散,父RDD一个分区中的数据如果给了RDD的多个分区(只有存在这个可能),就是shuffle
shuffle会有网络传输数据,但是有网络传输,并不意味着就是shuffle
【SparkCore】
1.什么是RDD
①弹性分布式数据集,RDD里面并不存储真正要计算的数据集,你对RDD进行操作,他会在Driver端转换成Task,下发到Executor计算分散在多台集 群上的数据
②RDD是一个代理,你对代理进行操作,他会生成一个Task帮你计算
③你操作这个代理,就像操作一个本地集合一样,不用关心任务调度,容错等
2.创建RDD
1)由一个已经存在的Scala集合创建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://node1.edu360.cn:9000/words.txt")
3.RDD编程Api
1.Transformation
转换
含义
map(func)
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func)
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func)
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)
根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset)
对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset)
对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks]))
对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks])
在一个(K,V)RDD上调用,返回一个(K, Iterator[V])RDD
 
reduceByKey(func, [numTasks])
在一个(K,V)RDD上调用,返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
 
sortByKey([ascending], [numTasks])
在一个(K,V)RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)RDD
sortBy(func,[ascending], [numTasks])
sortByKey类似,但是更灵活
join(otherDataset, [numTasks])
在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))RDD
cogroup(otherDataset, [numTasks])
在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
cartesian(otherDataset)
笛卡尔积
pipe(command, [envVars])
 
coalesce(numPartitions)
 
repartition(numPartitions)
 
repartitionAndSortWithinPartitions(partitioner)
 
 
2.Action
动作
含义
reduce(func)
通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect()
在驱动程序中,以数组的形式返回数据集的所有元素
count()
返回RDD的元素个数
first()
返回RDD的第一个元素(类似于take(1)
take(n)
返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed])
返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n[ordering])
 
saveAsTextFile(path)
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path
 
countByKey()
针对(K,V)类型的RDD,返回一个(K,Int)map,表示每一个key对应的元素个数。
foreach(func)
在数据集的每一个元素上,运行函数func进行更新。
 1 0.读取和存储
 2 //从hdfs读取
 3 val rdd = sc.textFile("hdfs://localhost:9000/wc")
 4 rdd.saveAsTextFile("hdfs://localhost:9000/testjoin")
 5  
 6 1.并集 注意(rdd1和rdd2类型要一致)
 7  val rdd1 = sc.parallelize(List(1,2,3))
 8 val rdd2 = sc.parallelize(List(3,4,5))
 9 rdd1.union(rdd2) =>Array[Int] = Array(1, 2, 3, 3, 4, 5)
10 val rdd3 = rdd1.leftOuterJoin(rdd2)
11 val rdd3 = rdd1.rightOuterJoin(rdd2)
12 2.连接
13 val rdd1 = sc.parallelize(List(("tom",1),("jack",2),("song",3)))
14 val rdd2 = sc.parallelize(List(("tom",3),("jack",4),("tao",1),("tao",2)))
15 rdd1.join(rdd2).collect => Array[(String, (Int, Int))] = Array((tom,(1,3)), (jack,(2,4)))
16  
17  
18 3.groupByKey
19 rdd3.collect
20  res32: Array[(String, Int)] = Array((tom,1), (jack,2), (song,6), (tao,9), (tom,1), (kitty,4), (jack,3), (song,1))
21 rdd3.groupByKey()
22 4.mapValues 遍历元祖的key进行运算
23  
24 5.#cogroup(协分组,先把rdd1和rdd2中相同key的分到一组)
25 val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
26 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
27 val rdd3 = rdd1.cogroup(rdd2) //rdd3.collect => Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))))
28 val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
29  
30 6.分区
31  默认最小分区数量是2,如果改为最小分区数量为1,那么分区数量为输入切片数量 sc.textFile("hdfs://localhost:9000/wc",1)
32  rdd.partitions.length
33 7.reduce (Action执行就有结果返回)聚合
34 val rdd = sc.parallelize(List(1,2,3,4,5))
35 rdd.reduce(_+_) =>15
36 8.count (Action) 计算rdd中的数据个数
37 val rdd = sc.parallelize(List(1,2,3,4,5,6))
38 rdd.count =>6
39 9.top(n) (Action) top(n) 函数会先将数据 排序(升序:从左向右 逐渐变大),然后从右向左取的 前n 个,tuple是 按照k  进行升序。
40 val rdd = sc.parallelize(List(3,6,5,7))
41 rdd.top(2) =>Array(3,6)
42 10.take(n) (Action) take函数从左向取n个数据
43  
44 11.rdd.takeOrdered(3) 该函数为 先升序,然后从左向右依次抽取3个。
45  
46 12.rdd.mapPartitionsWithIndex 一次拿出一个分区(分区中并没有数据而是记录要读取哪些数据,真正生成的Task会读取多条数据),并且可以将编号取出来)
47  分区在Driver端 ,生成的Task在excutor端
48  功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的(哪个分区对应的Task)
49  rdd的map方法在executor中执行时是一条一条的将数据拿出来处理
50  
51 val func = (index:Int,it:Iterator[Int])=>{
52  it.map(e=>s"part:$index,ele:$e")
53  }
54 rdd.mapParitionsWithIndex(func)
55  
56 13.rdd1.aggregate(0)(_+_,_+_) 注意是Action
57 局部聚合,在全局聚合,注意初始值,如果有三个分区,那么初始值需要局部加三次,全局聚合时再加一次
58  
59 14.rdd1.aggregateByKey(0)(_+_) 每个分区内部Key相同的聚合值相加,再全局聚合,注意初始值只应用在局部

 

 

 
15.collect方法执行过程
 1 16.rdd.countByKey() 计算出key出现的次数
 2  
 3 17.rdd.filterByRange("b","d") 对元祖中的key进行过滤 结果是包好 "b","c","d"key的元组
 4  
 5 18.flatMapValues 对元祖中的values 进行flatMap, 再和原有的key组成元祖
 6  
 7 19.reduceByKey(_+_)注意 可以使用默认分区器 reduceByKey(partitioner,_+_) partitioner为自定义分区器实现类
 8 20.foldByKey(0)(_+_)
 9 21.rdd1.aggregateByKey(0)(_+_) 
10 每个分区内部Key相同的聚合值相加,再全局聚合,注意初始值只应用在局部
11 22.combineByKey
12 val rdd = sc.parallelize(List((1,"cat"), (2,"dog"), (1,"song"), (1,"tao")))
13 23.foreach 和foreachPartition
14 foreach 是action 在excutor端执行,遍历每个元素进行操作
15 foreachPartition 遍历每个分区,每个分区是一个迭代器。
16 使用场景是将数据写入到数据库,应用foreachPartition比foreach更高效foreach拿一条数据建立一条连接,而foreachParttion每个分区对应一 条连接
17 24.自定义分区器
18 分区器的作用:决定了上游的数据到下游的哪个分区中
19  
20 25.rdd.cache 懒加载,当执行Action算子时才会运行。缓存保存在Excutor端
21 注意:一个JAVA对象存在内存中要比对象存储空间大几倍,java对象存储不用序列化和反序列化,在读取数据时速度较快以空间换取时间
22  
23  cache.unpersit(true) ---释放缓存
24  什么时候进行cache:
25 1.要求计算速度快
26 2.集群的资源要足够大
27 3.重要:cache的数据会多次出发Action
28 4.先进性过滤,然后将缩小范围的数据再cache到内存
29 26.persist(StorageLevel.MEMORY_ONLY) 持久化 ,cache底层调用
30 第一个参数:放到磁盘
31 第二个参数:放到内存
32 第三个参数:磁盘中的数据不是以java对象的方式保存
33 第四个参数:内存中的数据,以java对象的方式保存
34 第五个参数:在不同机器进行备份的个数
35 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true,num)
36 OFF_HEAP:堆外内存(分布式内存存储系统)
37 27.什么时候做checkpoint
38 1.迭代计算,要求保证数据安全
39 2.对速度没有太高要求(跟cache到内存进行对比)
40 3.将中间结果保存到hdfs,防止数据丢失
41  //设置checkpoint目录,目录不存在会自动创建(分布式文件系统的目录,通常是hdfs的目录)
42 //经过复杂计算,得到中间结果
43 //将中间结果chekpoint到指定的hdfs目录
44 //后续的计算就可以使用前面checkpiint的数据
45 sc.setCheckpointDir("hdfs://localhost:9000/checkpointData") 
46 rdd.check
47 28mapPartition和 foreachPartition区别
48 29.广播变量
49 //生成的广播变量引用在Driver端
50 val broadCastRef: Broadcast[Array[(Long, Long, String)]] = sparkSession.sparkContext.broadcast(rulesInDriver)
51 //从Excecutor中的广播引用获取值
52 val ipRulesInExecutor: Array[(Long, Long, String)] = broadCastRef.value
53  
54 30.案例计算ip归属地
55 31.自定义排序
56 1.元祖比较,比较规则,先比第一个,再比第二个
57 2.定义类进行比较实现
58 32.操作关系型数据库
59 val jdbcRDD = new JdbcRDD(
60  sc,
61  getConn,
62  "SELECT * FROM access_log WHERE counts >=? AND counts <=?",
63  1,
64  1000,
65  2,//分区数量
66  rs =>{
67  val province = rs.getString(1)
68  val counts = rs.getInt(2)
69  (province,counts)//结果集
70  }
71  )

 

 

 
 
3.RDD的依赖关系
RDD和它依赖的父RDDs)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
        
shuffle重要的依据:父RDD的一个分区的数据,要给子RDD的多个分区
①窄依赖指的是每一个父RDDPartition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女
②宽依赖指的是多个子RDDPartition会依赖同一个父RDDPartition
总结:窄依赖我们形象的比喻为超生
 
4.分区
默认最小分区数量是2,如果改为最小分区数量为1,那么分区数量为输入切片数量 sc.textFile("hdfs://localhost:9000/wc",1)
5.缓存(去除重复计算)
RDD的转换操作都是在内存中进行,如果在某个环节失败, 导致某个RDD丢失, 我们根据血统, 向上回溯, 找到所依赖的RDD, 但是,内存中的转化是稍纵即逝, 是无法找到的。
解决办法:使用Cache如果我们将她的依赖RDD, 存到了cache中, 出现故障时, 可以根据血统依赖, 找到可能的最近的依赖RDD, 如:wordcount案例中, 我们将linesRDD进行了Cache, 失败的时候,不用再读取文件, 直接再cache中读取linesRDD, 这将省略了读取文件。节约了时间。
//cache是懒加载的
RDD.cache
RDD.count
RDD.unpersist()//释放缓存
6.CheckPoint
checkpoint在spark中主要有两块应用:一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理(如之前waiting batch的job会在重启后继续处理)。
Q1:RDD中的数据是什么时候写入的?是在rdd调用checkpoint方法时候吗?
首先看一下RDD中checkpoint方法,可以看到在该方法中是只是新建了一个ReliableRDDCheckpintData的对象,并没有做实际的写入工作。实际触发写入的时机是在runJob生成改RDD后,调用RDD的doCheckpoint方法来做的。
Q2:在做checkpoint的时候,具体写入了哪些数据到HDFS了?
在经历调用RDD.doCheckpoint → RDDCheckpintData.checkpoint → ReliableRDDCheckpintData.doCheckpoint → ReliableRDDCheckpintData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中可以看到:将作为一个单独的任务(RunJob)将RDD中每个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外如果该RDD中的partitioner如果不为空,则也会将该对象序列化后存储到checkpoint目录。所以,在做checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。
Q3:在对RDD做完checkpoint以后,对做RDD的本省又做了哪些收尾工作?
在写完checkpoint数据到hdfs以后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操作。
Q4:实际过程中,使用RDD做checkpoint的时候需要注意什么问题?
通过A1,A2可以知道,在RDD计算完毕后,会再次通过RunJob将每个partition数据保存到HDFS。这样RDD将会计算两次,所以为了避免此类情况,最好将RDD进行cache。即1.1中rdd的推荐使用方法如下:
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
rdd.cache()
rdd.checkpoint()
7.广播变量
当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果使用广播变量 在每个Executor端中只有一份Driver端的变量副本
注意:1).不能将RDD广播出去,可以将RDD的结果广播出去
          2).广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义
通过SparkContext的实例变量在Driver端广播:
val sc = new SparkContext(conf)
val broadcastRef = sc.broadcast(rulesDriver)
在Executor端进行获取广播变量:
broadcastRef.value
8.排序
1.非自定义排序
sortByKey:sortByKey对于key是单个元素排序很简单,如果key是元组如(X1,X2,X3.....),它会先按照X1排序,若X1相同,则在根据X2排序,依次类推
(1, 6, 3), (2, 3, 3), (1, 1, 2), (1, 3, 5), (2, 1, 2)
思路:将1,3元素组成为key,2元素为value
val array = Array(( 1, 6, 3), (2, 3, 3), (1, 1, 2), (1, 3, 5), (2, 1, 2))  
val rdd1 = sc.parallelize(array)  
//设置元素(e1,e3)为key,value为原来的整体  
val rdd2 = rdd1.map(f => ((f._1, f._3), f))  
//利用sortByKey排序的对key的特性  
val rdd3 = rdd2.sortByKey()  
val rdd4 = rdd3.values.collect  
sortBy为sortByKey加强版:
如上排序可一用sortBy: xxRDD.sortBy(f=>(f._1,f._3))
2.自定义排序
把数据封装成类或者case class,然后类继承Ordered[类型] ,然后可以自定义排序规则。
如果是class,需要实现序列化特质,Serializable,如果是case class,可以不实现该序列化特质
tpRDD.sortBy(tp=>Boy(tp._2,tp_.3))
class User(val name:String, val age:Int , val fv:Int) extends Order[User] with Serializable{
override def compare(that:User):Int={
}
}
 
如果是case class,可以不实现该序列化特质
tpRDD.sortBy(tp=>Boy(tp._2,tp_.3))
case class User(val name:String, val age:Int , val fv:Int) extends Order[User] {
override def compare(that:User):Int={
}
}
 
③定义一个样例类,不需要重写比较方法,tpRDD.sortBy(tp=>Boy(tp._2,tp_.3))
case class User(val name:String, val age:Int , val fv:Int)
 
④充分利用元祖的比较规则,先比较第一个,在比较第二个
tpRDD.sortBy(tp=>(-tp._3,tp._2))
9.自定义分区
Spark内部提供了 HashPartitioner和RangePartitioner两种分区策略
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据(HashCode为负数时,为了避免小于0,spark做了以下处理)。
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。
RangePartitioner分区优势:尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;
劣势: 但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds。
 
自定义分区:
①我们只需要扩展 Partitioner抽象类,然后实现里面的三个方法
②只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None
def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
 1 练习:
 2 object GroupFavTeacher3 {
 3 def main(args: Array[String]): Unit = {
 4 val conf = new SparkConf().setAppName("GroupFavTeacher3").setMaster("local[4]")
 5 val sc = new SparkContext(conf)
 6 //读取数据
 7 val lines = sc.textFile(args(0))
 8 //整理数据
 9 val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
10 val index = line.lastIndexOf("/")
11 val teacher = line.substring(index + 1)
12 val subject = line.substring(0, index)
13 ((subject, teacher), 1)
14 })
15 //聚合数据
16 val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_+_)
17 //获取学科集合
18 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
19 //调用自定义分区器,让一个学科一个分区,然后在每个分区中既每个学科即可进行排序取topn
20 val sbPartitioner = new SubjectPartitioner(subjects)
21 val subPartitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPartitioner)
22 //遍历每个分区
23 val sorted: RDD[((String, String), Int)] = subPartitioned.mapPartitions(it => {
24 it.toList.sortBy(x => x._2).reverse.take(3).iterator
25 })
26 //获得结果
27 val result = sorted.collect()
28 println(result.toBuffer)
29  
30 sc.stop()
31 }
32 }
33  
34  
35 class SubjectPartitioner(subs:Array[String]) extends Partitioner{
36 //定义规则
37 val rules = new mutable.HashMap[String,Int]()
38 var num = 0
39 for(sb<-subs){
40 rules(sb) = num
41 num +=1
42 }
43 override def numPartitions: Int = subs.length
44 override def getPartition(key: Any): Int = {
45 /**
46 * key RDD所映射的键值对数据的键
47 */
48 val sub = key.asInstanceOf[(String,String)]._1
49 rules(sub)
50 }
51 }

 

 

10.RDD与mysql进行交互
1.数据存储到数据库,使用foreachPartition每次拿出一个分区进行数据存储,可以节省数据库连接打开关闭效率

      

 1 rdd2.foreachPartition(it=>{
 2 val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/sparktest","root","songtao0404")
 3 val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO t_user values(?,?,?)")
 4 it.foreach(tp=>{
 5 pstm.setInt(1,tp._1.toInt)
 6 pstm.setString(2,tp._2)
 7 pstm.setInt(3,tp._3.toInt)
 8 pstm.execute()
 9 })
10 if(pstm != null) pstm.close()
11 if(conn != null) conn.close()
12  
13 })

     2.从mysql读取数据

需要使用JdbcRDD来读取,JdbcRDD类如下:
 1 class JdbcRDD[T: ClassTag](
 2 sc: SparkContext,
 3 getConnection: () => Connection, //获得jdbc驱动函数
 4 sql: String,
 5 lowerBound: Long, //查询下界
 6 upperBound: Long, //查询上界,
 7 numPartitions: Int, //分区数
 8 mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) //结果集
 9  
10 //读取数据
11 val getConnection =()=>{
12 DriverManager.getConnection("jdbc:mysql://localhost:3306/sparktest","root","songtao0404")
13 }
14 val jdbcRdd = new JdbcRDD(
15 sc,
16 getConnection,
17 "select * from t_user where age >= ? and age<= ? ",
18 0,
19 90,
20 2,
21 r=>{
22 val id = r.getInt(1)
23 val name = r.getString(2)
24 val age = r.getInt(3)
25 (id,name,age)
26 }
27 )
28 println(jdbcRdd.collect().toBuffer)

 

【Spark-SQL】
1.spark 1.x SQL的基本用法(两种)
 
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建一个类,并定义类的成员变量
5.整理数据并关联class
6.将RDD转换成DataFrame(导入隐式转换)
7.将DataFrame注册成临时表
8.书写SQL(Transformation)
9.执行Action
 
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建StructType(schema)
5.整理数据将数据跟Row关联
6.通过rowRDD和schema创建DataFrame
7.将DataFrame注册成临时表
8.书写SQL(Transformation)
9.执行Action
 
 
 1 第一种代码:
 2 object SQLDemo1 {
 3 def main(args: Array[String]): Unit = {
 4 //提交的这个程序可以连接到Spark集群
 5 val conf = new SparkConf().setAppName("SQLDemo1").setMaster("local[4]")
 6 //创建SparkSQL的连接(程序执行的入口)
 7 val sc = new SparkContext(conf)
 8 //SParkContext不能创建特殊的RDD( DataFrame )
 9 val sqlContext = new SQLContext(sc)
10 //创建特殊的RDD(DataFrame)就是有Schema信息的RDD
11 //先有一个普通的RDD然后关联上Schema进而转成DataFrame
12  
13 val lines = sc.textFile("hdfs://localhost:9000/sparksql")
14 val boyRDD: RDD[Boy] = lines.map(line => {
15 val fields: Array[String] = line.split(",")
16 val id = fields(0).toLong
17 val name = fields(1)
18 val age = fields(2).toInt
19 val fv = fields(3).toDouble
20 Boy(id, name, age, fv)
21 })
22 //该RDD装的是Boy类型的数据,但还是一个RDD
23 //将RDD转换成一个DataFrame
24 //导入隐士转换
25 import sqlContext.implicits._
26 val bdf: DataFrame = boyRDD.toDF()
27 //编程DataFrame后就可以使用两种API进行编程了
28 //把DataFrame先注册临时表
29 bdf.registerTempTable("t_boy")
30  
31 //书写sql
32 val result: DataFrame = sqlContext.sql("SELECT * FROM t_boy ORDER BY fv desc,age asc")
33  
34 //查看结果(触发Action)
35 result.show()
36 sc.stop()
37 }
38 }
39 case class Boy(id:Long,name:String,age:Int,fv:Double)
40 第二种代码
41 object SQLDemo2 {
42 def main(args: Array[String]): Unit = {
43 //定义连接Spark集群
44 val conf = new SparkConf().setAppName("SQLDemo2").setMaster("local[4]")
45 //创建Spark入口
46 val sc = new SparkContext(conf)
47  
48 //创建特殊的RDD DataFrame
49 val sqlContext = new SQLContext(sc)
50  
51 val lines = sc.textFile("hdfs://localhost:9000/sparksql")
52 val boyRDD: RDD[Row] = lines.map(line => {
53 val fields: Array[String] = line.split(",")
54 val id = fields(0).toLong
55 val name = fields(1)
56 val age = fields(2).toInt
57 val fv = fields(3).toDouble
58 Row(id, name, age, fv)
59 })
60  
61 //定义Schema,不需要用样例类来约束数据了
62 val schema = StructType(List(
63 StructField("id", LongType, true),
64 StructField("name", StringType, true),
65 StructField("age", IntegerType, true),
66 StructField("fv", DoubleType, true)
67 ))
68 //创建DataFrame
69 val bdf: DataFrame = sqlContext.createDataFrame(boyRDD,schema)
70  
71 //注册临时表
72 bdf.registerTempTable("t_boy")
73  
74 //写sql
75 val result: DataFrame = sqlContext.sql("SELECT * FROM t_boy order by fv desc,age asc")
76  
77 //查看结果,触发Action
78 result.show()
79  
80 sc.stop()
81 }
82 }
2.spark 2.x SQL的基本用法
1.RDD,DataFrame,DataSet
RDD:RDD是Spark建立之初的核心API。RDD是不可变分布式弹性数据集,在Spark集群中可跨节点分区,并提供分布式low-level API来操作RDD,包括transformation和action。
DataFrame:与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame的数据集都是按指定列存储,即结构化数据。类似于传统数据库中的表。
DataSet:在Spark2.0中,DataFrame API将会和Dataset  API合并,统一数据处理API。
DataFrame是一种特殊的RDD,处理结构化数据集时需要使用DataFrame和DataSet。
2.创建SparkSession(SparkSession包含了SparkContext,和sqlContext兼容了老版本的调用)
val sparkSession = SparkSession
.builder()
.appName("Spark2SQL")
.master("local[4]")
.getOrCreate()
3.使用DataFrame api + SQL操作数据
 1 object Spark2SQL {
 2 def main(args: Array[String]): Unit = {
 3 val sparkSession = SparkSession
 4 .builder()
 5 .appName("Spark2SQL")
 6 .master("local[4]")
 7 .getOrCreate()
 8  
 9 //注意这里使用SparkSession.read读取的结果类型是 DataSet[String]
10 val lines: Dataset[String] = sparkSession.read.textFile("hdfs://localhost:9000/stu")
11 import sparkSession.implicits._
12 val userDataSet: Dataset[User] = lines.map(line => {
13 val fields: Array[String] = line.split(",")
14 val id = fields(0).toInt
15 val name = fields(1)
16 val age = fields(2).toInt
17 User(id, name, age)
18 })
19 //使用DataSet api 进行查询
20 val ds: Dataset[User] = userDataSet.where($"age" > 14)
21 println("使用DataSet api进行查询")
22 ds.show()
23  
24 //使用SQL进行查询
25 //先注册成临时表
26 ds.createTempView("t_user")
27 val df: DataFrame = sparkSession.sql("select * from t_user where age > 14")
28 df.show()
29  
30 }
31 }
32  
33 /**
34 * 定义实体类信息
35 * @param id
36 * @param name
37 * @param age
38 */
39 case class User(id:Int,name:String,age:Int)
4.使用DataSet API + SQL操作数据
 1 object Spark2SQL2 {
 2 def main(args: Array[String]): Unit = {
 3 val sparkSession = SparkSession
 4 .builder()
 5 .appName("Spark2SQL2")
 6 .master("local[4]")
 7 .getOrCreate()
 8 //注意这里使用的是SparkContext读取的结果数据类型是RDD[String]
 9 val lines: RDD[String] = sparkSession.sparkContext.textFile("hdfs://localhost:9000/stu")
10 val userRDD: RDD[Row] = lines.map(line => {
11 val fields: Array[String] = line.split(",")
12 val id = fields(0).toInt
13 val name = fields(1)
14 val age = fields(2).toInt
15 Row(id, name, age)
16 })
17  
18 //构建Schema信息
19 val schema = StructType(List(
20 StructField("id", IntegerType, true),
21 StructField("name", StringType, true),
22 StructField("age", IntegerType, true)
23 ))
24  
25  
26 //创建DataFrame
27 val dataFrame: DataFrame = sparkSession.createDataFrame(userRDD,schema)
28 //使用DataFrameApi查询
29 import sparkSession.implicits._
30 val df: DataFrame = dataFrame.where($"age" > 14)
31 println("使用DataFrame的API进行查询")
32 df.show()
33  
34 //使用SQL
35 dataFrame.createTempView("t_user")
36 val sqldf: DataFrame = sparkSession.sql("select * from t_user where age > 14")
37 println("使用SQL进行查询")
38 sqldf.show()
39  
40 sparkSession.stop()
41 }
42 }
5.sparkSession读数据库
 1 //第一种
 2 val sparkSession: SparkSession = SparkSession
 3 .builder()
 4 .appName("SparkSQL2JDBC")
 5 .master("local[4]")
 6 .getOrCreate()
 7 val df: DataFrame = sparkSession.read.format("jdbc").options(
 8 Map("url" -> "jdbc:mysql://localhost:3306/sparktest",
 9 "driver" -> "com.mysql.jdbc.driver",
10 "user" -> "root",
11 "password" -> "songtao0404"
12 )
13 ).load()
14 df.printSchema()
15  
16 //第二种
17 val props = new Properties()
18 val url = "jdbc:mysql://localhost:3306/sparktest"
19 val tname = "t_user"
20 props.put("user","root")
21 props.put("password","songtao0404")
22 props.put("driver","com.mysql.jdbc.Driver")
23 val df2: DataFrame = sparkSession.read.format("jdbc").jdbc(url,tname,props)
24 df2.printSchema()
25  
26 //第三种
27 val df3: DataFrame = sparkSession.read.jdbc(url,tname,props)
28 df3.printSchema() 
6.写数据库
1 //第一种写,存在该表就直接报错(默认模式)
2 df1.write.jdbc(url,"t_user1",props)
3 //第二种写
4 filted.write.mode(SaveMode.Ignore).jdbc("jdbc:mysql://localhost:3306/bigdata","access_log1",props)
5 SaveMode.Ignore:表不存在,则创建,存在,则什么事都不做
6 SaveMode.ErrorIfExists:
7 SaveMode.Append:表存在情况下,在后面追加
8 SaveMode.Overwrite:存在该表情况下,覆盖里面数据
9  

 

 
 
 

转载于:https://www.cnblogs.com/thesongtao/p/10642518.html

最后

以上就是知性凉面为你收集整理的Spark(1)-笔记整理的全部内容,希望文章能够帮你解决Spark(1)-笔记整理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部