输入数据来自kafka,十行一组,每组4个数字
长这样
1
2
3
4
5
6
7
8
9
10
11706260,34,13,10 653244,16,8,43 395410,23,15,8 735026,30,29,16 106844,45,29,33 796853,14,41,37 324616,15,5,37 156450,41,2,27 385898,47,34,5 710053,30,37,27
从简单入手,想算出他们按后三个数字分别group by的计数
如果用sql表达,就是三个group by的子查询,外面套一个sum,
在spark里能做的当然比sql多,但先从简单开始吧
参考spark包内带的例子python/streaming/direct_kafka_wordcount.py后
代码大致长这样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(appName="CountByTagPV") #batch every 1 seconds ssc = StreamingContext(sc, 1) kvs = KafkaUtils.createDirectStream(ssc, ["AccessLog"], {"metadata.broker.list": "localhost:9092"}) lines = kvs.map(lambda x : x[1]) counts = lines.map(lambda l : l.strip()) .flatMap(lambda l : l.split("n")) .map(lambda l : l.split(",")) .flatMap(lambda x: ((x[1],1),(x[2],1),(x[3],1))) .reduceByKey(add) counts.pprint() ssc.start() ssc.awaitTermination()
调试运行中,遇到小坑几个
Spark Streaming’s Kafka libraries not found in class path
执行脚本后报这个错,无法继续
这个报错后面还跟了两个解决办法,但都是要调用spark-submit命令的,
对脚本还在本地调试运行的情况,不对症
不过还是按方法2的提示,上search.maven.org,找到了它要求的jar,
Group Id = org.apache.spark,
Artifact Id = spark-streaming-kafka-0-8-assembly,
Version = 2.4.3
对清楚这几个参数,然后再找pyspark模块的jar目录,
这个目录位置随python和pip版本不同会有变化,可以用下面的语句辅助定位
1
2# find /usr/lib /usr/local/lib -type d -name jars|grep pyspark
找到后把jar文件cp进该目录下
IndexError: list index out of range
脚本能执行后,一输入数据就会报这个错,
并且脚本停止运行。
出错的位置很显然在第二个flatMap,
这句是把前面按逗号分隔出的长度为4的数组,
打散为3个长度为2的数组
报错的原因是因为数据处理过程中会产生空list,
从kafka读入的数据本来没有空行,
但第一个flatMap按n对数据做split后,
因为最末尾一行也有n
切分后就会在数据流中嵌入一个空list
因为pprint的输出默认只输出一部分数据,
这个空list没有在输出范围了,
花了好一些时间才推断出原因。
找到原因就有很多种解法了,
我选择在按n切分前先去掉最末尾那个
最后的输出片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29------------------------------------------- Time: 2019-08-17 02:34:42 ------------------------------------------- ('1', 1) ('29', 1) ('23', 1) ('38', 2) ('27', 2) ('44', 1) ('34', 1) ('37', 1) ('0', 2) ('33', 1) ... ------------------------------------------- Time: 2019-08-17 02:34:43 ------------------------------------------- ('8', 4) ('43', 3) ('29', 7) ('23', 3) ('30', 5) ('34', 7) ('10', 5) ('15', 4) ('27', 5) ('49', 2) ...
最后
以上就是平常美女最近收集整理的关于spark实战项目之二,读kafka数据流,分组计数Spark Streaming’s Kafka libraries not found in class pathIndexError: list index out of range的全部内容,更多相关spark实战项目之二,读kafka数据流,分组计数Spark内容请搜索靠谱客的其他文章。
发表评论 取消回复