概述
package com.cartravel.kafka
import java.lang.Long
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Try
import scala.collection.JavaConversions._
/**
- Created by angel
*/
class KafkaManager(zkHost:String , kafkaParams:Map[String , Object]) extends Serializable{
//zk连接
private val (zkClient,zkConnection) = ZkUtils.createZkClientAndConnection(zkHost , 10000 , 10000)
//zk工具类
private val zkUtils = new ZkUtils(zkClient,zkConnection , false)
/**
* def createDirectStream: 返回:InputDStream
* */
def createDirectStream[K:ClassTag , V:ClassTag](ssc:StreamingContext , topics:Seq[String]):InputDStream[ConsumerRecord[K, V]] = {
//1:readOffset
val groupId = kafkaParams(“group.id”).toString
val topicPartition: Map[TopicPartition, Long] = readOffset(topics , groupId)
//2:KafkaUtils.createDirectStream ---> InputDStream
val stream: InputDStream[ConsumerRecord[K, V]] = KafkaUtils.createDirectStream[K, V](
ssc,
PreferConsistent,
ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, topicPartition)
)
stream
}
/**
* 读取偏移量
* @param topics 主题
* @param groupId 消费组
* @return Map[car-1 , car-2 , Long]
* */
private def readOffset(topics:Seq[String] , groupId:String):Map[TopicPartition , Long] = {
//维护一个TopicPartition 可变的
val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition , Long]
//拿topic和分区信息
val topicAndPartitionMaps: mutable.Map[String, Seq[Int]] = zkUtils.getPartitionsForTopics(topics)
topicAndPartitionMaps.foreach(topicPartitions =>{
val zkGroupTopicsDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId , topicPartitions._1)
topicPartitions._2.foreach(partition =>{//迭代分区
//维护offset的路径
val offsetPath = s"${zkGroupTopicsDirs.consumerOffsetDir}/${partition}"
val tryGetTopicPartition = Try{
//String --->offset
val offsetTuples: (String, Stat) = zkUtils.readData(offsetPath)
if(offsetTuples != null){
topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , offsetTuples._1.toLong)
}
}
//如果zk读取偏移量失败了,就不能用zk去读,就应该用kafka的API去读
if(tryGetTopicPartition.isFailure){
//维护一个kafka的消费者
val consumer = new KafkaConsumer[String , Object](kafkaParams)
val topicCollection = List(new TopicPartition(topicPartitions._1 , partition))
//拉去数据
consumer.assign(topicCollection)
//拉取数据的位置
val avaliableOffset: Long = consumer.beginningOffsets(topicCollection).values().head
consumer.close()
//返回一个map类型
topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , avaliableOffset)
}
})
})
//currentoffset 、 earliestoffset leatestOffset
//cur < ear || cur > leaty ==> 矫正--> ear
//TODO 矫正 因为在生产环境中有可能抱一个错误是 offset out of range 在这里从最早去消费
val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
val latestOffsets = getLatestOffsets(kafkaParams , topics)
for((k,v) <- topicPartitionMap){
//当前offset
val current = v
//当前的earliest
val earliest = earliestOffsets.get(k).get
//当前的latest
val latest = latestOffsets.get(k).get
if(current < earliest || current > latest){
//就会重新消费
topicPartitionMap.put(k , earliest)
}
}
topicPartitionMap.toMap
}
private def getEarliestOffsets(kafkaParams:Map[String , Object] , topics:Seq[String]) = {
//可变的数组
val newKafkaParams = mutable.MapString , Object
//当前的kafka叠加进来
newKafkaParams ++= kafkaParams
//在维护一个offset 表示从头消费
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , “earliest”)
//kafka api
val consumer = new KafkaConsumer[String , Array[Byte]](newKafkaParams)
//订阅
consumer.subscribe(topics)
val noOffsetForPartitionExceptionSet = mutable.Set()
try{
//拉取数据
consumer.poll(0)
}catch{
case e:NoOffsetForPartitionException =>
// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从头开始
consumer.seekToBeginning(topicp)
val toMap = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}
private def getLatestOffsets(kafkaParams:Map[String , Object] , topics:Seq[String]) = {
val newKafkaParams = mutable.MapString , Object
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , “latest”)
//kafka api
val consumer = new KafkaConsumer[String , Array[Byte]](newKafkaParams)
//订阅
consumer.subscribe(topics)
val noOffsetForPartitionExceptionSet = mutable.Set()
try{
consumer.poll(0)
}catch{
case e:NoOffsetForPartitionException =>
// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从尾开始
consumer.seekToEnd(topicp)
val toMap = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}
def persistOffset[K,V](rdd:RDD[ConsumerRecord[K,V]] , storeOffset:Boolean = true) = {
//获取消费组
val groupId = kafkaParams(“group.id”).toString
//所有的信息都在这个里面
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(or =>{
val zKGroupTopicDirs = new ZKGroupTopicDirs(groupId , or.topic)
val offsetPath = zKGroupTopicDirs.consumerOffsetDir+"/"+or.partition
val data = if(storeOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(offsetPath , data+"")
})
}
}
import com.cartravel.hbase.conn.HbaseConnections
import com.cartravel.kafka.KafkaManager
import com.cartravel.loggings.Logging
import com.cartravel.spark.SparkEngine
import com.cartravel.tools.{JsonParse, StructInterpreter, TimeUtils}
import com.cartravel.utils.GlobalConfigUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Try
/**
- Created by angel
*/
object StreamApp extends Logging{
//5 node01:9092,node02:9092,node03:9092,node04:9092,node05:9092,node06:9092 test_car test_consumer_car node01:2181,node02:2181,node03:2181,node04:2181,node05:2181,node06:2181
def main(args: Array[String]): Unit = {
//1、从kafka拿数据
if (args.length < 5) {
System.err.println(“Usage: KafkaDirectStream n” +
" n" +
" n" +
" n" +
" n" +
" "
)
System.exit(1)
}
//TODO
val startTime = TimeUtils.getNowDataMin
val batchDuration = args(0)
val bootstrapServers = args(1).toString
val topicsSet = args(2).toString.split(",").toSet
val consumerGroupID = args(3)
val zkQuorum = args(4)
val sparkConf = SparkEngine.getSparkConf()
val session = SparkEngine.getSparkSession(sparkConf)
val sc = session.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
val topics = topicsSet.toArray
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> consumerGroupID,
"auto.offset.reset" -> GlobalConfigUtils.getProp("auto.offset.reset"),
"enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
)
//1:
/**
* val stream:InputDStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
* */
val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc, topics)
inputDStream.print()
inputDStream.foreachRDD(rdd =>{
if(!rdd.isEmpty()){
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
ranges.foreach(line =>{
//fromOffset从那个开始
println(s" 当前读取的topic:${line.topic} , partition:${line.partition} , fromOffset:${line.fromOffset} , untilOffset: ${line.untilOffset}")
})
val doElse = Try{
val data: RDD[String] = rdd.map(line => line.value())
data.foreachPartition(partition =>{
//构建Hbase连接
val conn = HbaseConnections.getHbaseConn
//写业务
partition.foreach(d =>{
//string 表 Any指的是类
val parse: (String , Any) = JsonParse.parse(d)
StructInterpreter.interpreter(parse._1 , parse , conn)
})
//注销连接
HbaseConnections.closeConn(conn)
})
}
if(doElse.isSuccess){
//提交偏移量
kafkaManager.persistOffset(rdd)
}
}
})
最后
以上就是威武白猫为你收集整理的手动管理kafka的偏移量的全部内容,希望文章能够帮你解决手动管理kafka的偏移量所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复