概述
import Utils.RedisOffset
import day09.Jpools
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SSCDirectKafka010_Redis_Offset {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SSCDirectKafka010_Redis_Offset").setMaster("local[*]")
//配置在kafka中每次拉取的数据量,这里配置的2并不是每次在kafka拉取2条数据,而是:2*分区数量*采样时间(12)
conf.set("spark.streaming.kafka.maxRatePerPartition", "2")
//是否优雅的停止你的SparkStreaming,如果不加这个参数的话,服务停止的时候可能会造成数据的丢失
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf,Seconds(2))
val groupId = "day11_09"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean) //是否自动递交偏移量
)
val topic = "helloTopic"
val topics = Array(topic)
//获取偏移量
val redisManage = RedisOffset(topic)
val result = if (redisManage.size > 0){
KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,redisManage)
)
}else {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
}
result.foreachRDD(foreachFunc = rdd => {
val jedis = Jpools.getJedis
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//拉取到driver端
val reduced = rdd.map(t=>(t.value(),1)).reduceByKey(_+_).collect()
//设置回滚
val transaction = jedis.multi()//返回一个事务控制对象
try{
for (i <- reduced){
transaction.hincrBy("helloTopic",i._1,i._2)
}
for(i <- offsetRange){
println(i)
transaction.hset(groupId,i.topic+"-"+i.partition,i.untilOffset.toString)
}
transaction.exec()
}catch {
case _ => println("你报错了,需要回滚")
transaction.discard()
}
jedis.close()
})
ssc.start()
ssc.awaitTermination()
}
}
工具包
import java.util
import day09.Jpools
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable._
object RedisOffset {
def apply(groupId:String) = {
val redisOffset = Map[TopicPartition,Long]()
//获取jedis连接
val jedis = Jpools.getJedis
val tpOffset: util.Map[String, String] = jedis.hgetAll(groupId)
import scala.collection.JavaConversions._
val tpOffsetList = tpOffset.toList
for (i <- tpOffsetList){
val s = i._1.split("-")
redisOffset += (new TopicPartition(s(0),s(1).toInt) -> i._2.toLong)
}
redisOffset
}
}
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{Jedis, JedisPool}
object Jpools {
private val poolConfig = new GenericObjectPoolConfig
poolConfig.setMaxIdle(5)
poolConfig.setMaxTotal(2000)
private val jedisPool = new JedisPool(poolConfig,"hadoop01")
def getJedis:Jedis = {
val jedis = jedisPool.getResource
jedis.select(1)
jedis
}
}
最后
以上就是包容爆米花为你收集整理的kafka偏移量利用redis来管理的全部内容,希望文章能够帮你解决kafka偏移量利用redis来管理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复