我是靠谱客的博主 小巧高山,这篇文章主要介绍读取kafka的偏移量的工具类,现在分享给大家,希望可以做个参考。

复制代码
1
2
读取kafka的偏移量的工具类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// An highlighted block import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object MyKafkaUtil { private val properties: Properties = MyPropertiesUtil.load("config.properties") val broker_list = properties.getProperty("kafka.broker.list") // kafka消费者配置 var kafkaParam = collection.mutable.Map( "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用于标识这个消费者属于哪个消费团体 "group.id" -> "gmall0523_group", //latest自动重置偏移量为最新的偏移量 "auto.offset.reset" -> "latest", //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据 //如果是false,会需要手动维护kafka偏移量 "enable.auto.commit" -> (false: java.lang.Boolean) ) // 创建DStream,返回接收到的输入数据 使用默认的消费者组 def getKafkaStream(topic: String,ssc:StreamingContext ): InputDStream[ConsumerRecord[String,String]]={ val dStream = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Array(topic), kafkaParam ) ) dStream } //在对Kafka数据进行消费的时候,指定消费者组 def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String): InputDStream[ConsumerRecord[String,String]]={ kafkaParam("group.id")=groupId val dStream = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam )) dStream } //从指定的偏移量位置读取数据 offsets 是一个map 集合key 是new TopicPartition(topic, partition.toInt),value 是 offset.toLong def getKafkaStream(topic: String,ssc:StreamingContext,offsets:Map[TopicPartition,Long],groupId:String) : InputDStream[ConsumerRecord[String,String]]={ kafkaParam("group.id")=groupId val dStream = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam,offsets)) dStream } }

最后

以上就是小巧高山最近收集整理的关于读取kafka的偏移量的工具类的全部内容,更多相关读取kafka内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部