我是靠谱客的博主 俊秀电灯胆,这篇文章主要介绍spark streaming消费指定的topic和partition并手动更新offset,现在分享给大家,希望可以做个参考。

直接上代码
scala版的

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils} import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag /** * Create by shengjk1 */ class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable { private val kc = new KafkaCluster(kafkaParams) /** * 创建数据流 */ def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get // 在zookeeper上读取offsets前先根据实际情况更新offsets setOrUpdateOffsets(topics, groupId) //从zookeeper上读取offset开始消费message val messages = { val partitionsE = kc.getPartitions(topics) if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get println("partitions ",partitions); val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}") val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)) } messages } /** * * @param ssc * @param kafkaParams * @param topicPartition Set([test01,0], [test01,1], [test01,2])) * @tparam K * @tparam V * @tparam KD * @tparam VD * @return */ //update by shengjk1 def createDirectStreamByAssignPartition[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String],topicPartition: Set[TopicAndPartition]): InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get // 在zookeeper上读取offsets前先根据实际情况更新offsets val topics=ArrayBuffer[String]() val tpArray=topicPartition.toArray for(i<- 0 until tpArray.length){ topics+=tpArray(i).topic } setOrUpdateOffsets(topics.toSet, groupId) //从zookeeper上读取offset开始消费message val messages = { val consumerOffsetsE = kc.getConsumerOffsets(groupId, topicPartition) if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}") val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)) } messages } /** * 创建数据流前,根据实际消费情况更新消费offsets * * @param topics * @param groupId */ private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach(topic => { var hasConsumed = true val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) hasConsumed = false if (hasConsumed) { // 消费过 /** * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, * 这时把consumerOffsets更新为earliestLeaderOffsets */ val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get val consumerOffsets = consumerOffsetsE.right.get // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets var offsets: Map[TopicAndPartition, Long] = Map() consumerOffsets.foreach({ case (tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if (n < earliestLeaderOffset) { println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + " offsets已经过时,更新为" + earliestLeaderOffset) offsets += (tp -> earliestLeaderOffset) } }) if (!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets) } } else { // 没有消费过 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null if (reset == Some("smallest")) { val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } else { val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } val offsets = leaderOffsets.map { case (tp, offset) => (tp, offset.offset) } kc.setConsumerOffsets(groupId, offsets) } }) } /** * 更新zookeeper上的消费offsets * * @param rdd */ def updateZKOffsets(rdd: RDD[(String, String)]): Unit = { val groupId = kafkaParams.get("group.id").get val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetsList) { val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } } }

java版的可参照

http://blog.csdn.net/jsjsjs1789/article/details/52823218

最后

以上就是俊秀电灯胆最近收集整理的关于spark streaming消费指定的topic和partition并手动更新offset的全部内容,更多相关spark内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部