概述
输入数据来自kafka,十行一组,每组4个数字
长这样
706260,34,13,10
653244,16,8,43
395410,23,15,8
735026,30,29,16
106844,45,29,33
796853,14,41,37
324616,15,5,37
156450,41,2,27
385898,47,34,5
710053,30,37,27
从简单入手,想算出他们按后三个数字分别group by的计数
如果用sql表达,就是三个group by的子查询,外面套一个sum,
在spark里能做的当然比sql多,但先从简单开始吧
参考spark包内带的例子python/streaming/direct_kafka_wordcount.py后
代码大致长这样
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="CountByTagPV")
#batch every 1 seconds
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["AccessLog"], {"metadata.broker.list": "localhost:9092"})
lines = kvs.map(lambda x : x[1])
counts = lines.map(lambda l : l.strip())
.flatMap(lambda l : l.split("n"))
.map(lambda l : l.split(","))
.flatMap(lambda x: ((x[1],1),(x[2],1),(x[3],1)))
.reduceByKey(add)
counts.pprint()
ssc.start()
ssc.awaitTermination()
调试运行中,遇到小坑几个
Spark Streaming’s Kafka libraries not found in class path
执行脚本后报这个错,无法继续
这个报错后面还跟了两个解决办法,但都是要调用spark-submit命令的,
对脚本还在本地调试运行的情况,不对症
不过还是按方法2的提示,上search.maven.org,找到了它要求的jar,
Group Id = org.apache.spark,
Artifact Id = spark-streaming-kafka-0-8-assembly,
Version = 2.4.3
对清楚这几个参数,然后再找pyspark模块的jar目录,
这个目录位置随python和pip版本不同会有变化,可以用下面的语句辅助定位
# find /usr/lib /usr/local/lib -type d -name jars|grep pyspark
找到后把jar文件cp进该目录下
IndexError: list index out of range
脚本能执行后,一输入数据就会报这个错,
并且脚本停止运行。
出错的位置很显然在第二个flatMap,
这句是把前面按逗号分隔出的长度为4的数组,
打散为3个长度为2的数组
报错的原因是因为数据处理过程中会产生空list,
从kafka读入的数据本来没有空行,
但第一个flatMap按n对数据做split后,
因为最末尾一行也有n
切分后就会在数据流中嵌入一个空list
因为pprint的输出默认只输出一部分数据,
这个空list没有在输出范围了,
花了好一些时间才推断出原因。
找到原因就有很多种解法了,
我选择在按n切分前先去掉最末尾那个
最后的输出片段
-------------------------------------------
Time: 2019-08-17 02:34:42
-------------------------------------------
('1', 1)
('29', 1)
('23', 1)
('38', 2)
('27', 2)
('44', 1)
('34', 1)
('37', 1)
('0', 2)
('33', 1)
...
-------------------------------------------
Time: 2019-08-17 02:34:43
-------------------------------------------
('8', 4)
('43', 3)
('29', 7)
('23', 3)
('30', 5)
('34', 7)
('10', 5)
('15', 4)
('27', 5)
('49', 2)
...
最后
以上就是平常美女为你收集整理的spark实战项目之二,读kafka数据流,分组计数Spark Streaming’s Kafka libraries not found in class pathIndexError: list index out of range的全部内容,希望文章能够帮你解决spark实战项目之二,读kafka数据流,分组计数Spark Streaming’s Kafka libraries not found in class pathIndexError: list index out of range所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复