我是靠谱客的博主 威武白猫,最近开发中收集的这篇文章主要介绍手动管理kafka的偏移量,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package com.cartravel.kafka

import java.lang.Long

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat

import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Try
import scala.collection.JavaConversions._

/**

  • Created by angel
    */
    class KafkaManager(zkHost:String , kafkaParams:Map[String , Object]) extends Serializable{
    //zk连接
    private val (zkClient,zkConnection) = ZkUtils.createZkClientAndConnection(zkHost , 10000 , 10000)
    //zk工具类
    private val zkUtils = new ZkUtils(zkClient,zkConnection , false)

/**
* def createDirectStream: 返回:InputDStream
* */

def createDirectStream[K:ClassTag , V:ClassTag](ssc:StreamingContext , topics:Seq[String]):InputDStream[ConsumerRecord[K, V]] = {
//1:readOffset
val groupId = kafkaParams(“group.id”).toString

val topicPartition: Map[TopicPartition, Long] = readOffset(topics , groupId)
//2:KafkaUtils.createDirectStream ---> InputDStream
val stream: InputDStream[ConsumerRecord[K, V]] = KafkaUtils.createDirectStream[K, V](
  ssc,
  PreferConsistent,
  ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, topicPartition)
)
stream

}

/**
* 读取偏移量
* @param topics 主题
* @param groupId 消费组
* @return Map[car-1 , car-2 , Long]
* */

private def readOffset(topics:Seq[String] , groupId:String):Map[TopicPartition , Long] = {
//维护一个TopicPartition 可变的
val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition , Long]

//拿topic和分区信息
val topicAndPartitionMaps: mutable.Map[String, Seq[Int]] = zkUtils.getPartitionsForTopics(topics)

topicAndPartitionMaps.foreach(topicPartitions =>{
  val zkGroupTopicsDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId , topicPartitions._1)
  topicPartitions._2.foreach(partition =>{//迭代分区
    //维护offset的路径
    val offsetPath = s"${zkGroupTopicsDirs.consumerOffsetDir}/${partition}"

    val  tryGetTopicPartition = Try{
      //String --->offset
      val offsetTuples: (String, Stat) = zkUtils.readData(offsetPath)
      if(offsetTuples != null){
        topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , offsetTuples._1.toLong)
      }
    }
    //如果zk读取偏移量失败了,就不能用zk去读,就应该用kafka的API去读
    if(tryGetTopicPartition.isFailure){
      //维护一个kafka的消费者
      val consumer = new KafkaConsumer[String , Object](kafkaParams)
      val topicCollection = List(new TopicPartition(topicPartitions._1 , partition))
      //拉去数据
      consumer.assign(topicCollection)
      //拉取数据的位置
      val avaliableOffset: Long = consumer.beginningOffsets(topicCollection).values().head
      consumer.close()
      //返回一个map类型
      topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , avaliableOffset)

    }
  })
})

//currentoffset 、 earliestoffset  leatestOffset
//cur < ear || cur > leaty ==> 矫正-->  ear
//TODO 矫正   因为在生产环境中有可能抱一个错误是 offset out of range  在这里从最早去消费
val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
val latestOffsets = getLatestOffsets(kafkaParams , topics)
for((k,v) <- topicPartitionMap){
  //当前offset
  val current = v
  //当前的earliest
  val earliest = earliestOffsets.get(k).get
  //当前的latest
  val latest = latestOffsets.get(k).get
  if(current < earliest || current > latest){
    //就会重新消费
    topicPartitionMap.put(k , earliest)
  }
}
topicPartitionMap.toMap

}

private def getEarliestOffsets(kafkaParams:Map[String , Object] , topics:Seq[String]) = {
//可变的数组
val newKafkaParams = mutable.MapString , Object
//当前的kafka叠加进来
newKafkaParams ++= kafkaParams
//在维护一个offset 表示从头消费
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , “earliest”)

//kafka api
val consumer = new KafkaConsumer[String , Array[Byte]](newKafkaParams)
//订阅
consumer.subscribe(topics)
val noOffsetForPartitionExceptionSet = mutable.Set()
try{
  //拉取数据
  consumer.poll(0)
}catch{
  case e:NoOffsetForPartitionException =>

// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从头开始
consumer.seekToBeginning(topicp)
val toMap = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}

private def getLatestOffsets(kafkaParams:Map[String , Object] , topics:Seq[String]) = {
val newKafkaParams = mutable.MapString , Object
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , “latest”)

//kafka api
val consumer = new KafkaConsumer[String , Array[Byte]](newKafkaParams)
//订阅
consumer.subscribe(topics)
val noOffsetForPartitionExceptionSet = mutable.Set()
try{
  consumer.poll(0)
}catch{
  case e:NoOffsetForPartitionException =>

// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从尾开始
consumer.seekToEnd(topicp)
val toMap = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}

def persistOffset[K,V](rdd:RDD[ConsumerRecord[K,V]] , storeOffset:Boolean = true) = {
//获取消费组
val groupId = kafkaParams(“group.id”).toString
//所有的信息都在这个里面
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

offsetRanges.foreach(or =>{
  val zKGroupTopicDirs = new ZKGroupTopicDirs(groupId , or.topic)
  val offsetPath = zKGroupTopicDirs.consumerOffsetDir+"/"+or.partition

  val  data = if(storeOffset) or.untilOffset else or.fromOffset
  zkUtils.updatePersistentPath(offsetPath , data+"")
})

}
}

import com.cartravel.hbase.conn.HbaseConnections
import com.cartravel.kafka.KafkaManager
import com.cartravel.loggings.Logging
import com.cartravel.spark.SparkEngine
import com.cartravel.tools.{JsonParse, StructInterpreter, TimeUtils}
import com.cartravel.utils.GlobalConfigUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Try

/**

  • Created by angel
    */
    object StreamApp extends Logging{

//5 node01:9092,node02:9092,node03:9092,node04:9092,node05:9092,node06:9092 test_car test_consumer_car node01:2181,node02:2181,node03:2181,node04:2181,node05:2181,node06:2181
def main(args: Array[String]): Unit = {
//1、从kafka拿数据
if (args.length < 5) {
System.err.println(“Usage: KafkaDirectStream n” +
" n" +
" n" +
" n" +
" n" +
" "
)
System.exit(1)
}
//TODO
val startTime = TimeUtils.getNowDataMin

val batchDuration = args(0)
val bootstrapServers = args(1).toString
val topicsSet = args(2).toString.split(",").toSet
val consumerGroupID = args(3)
val zkQuorum = args(4)
val sparkConf = SparkEngine.getSparkConf()
val session = SparkEngine.getSparkSession(sparkConf)
val sc = session.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))

val topics = topicsSet.toArray
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> consumerGroupID,
  "auto.offset.reset" -> GlobalConfigUtils.getProp("auto.offset.reset"),
  "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
)

//1:
/**
  * val stream:InputDStream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
  * */

val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc, topics)

inputDStream.print()

inputDStream.foreachRDD(rdd =>{
  if(!rdd.isEmpty()){
    val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    ranges.foreach(line =>{
      //fromOffset从那个开始
      println(s" 当前读取的topic:${line.topic} , partition:${line.partition} , fromOffset:${line.fromOffset} , untilOffset: ${line.untilOffset}")
    })

    val doElse = Try{
      val data: RDD[String] = rdd.map(line => line.value())
      data.foreachPartition(partition =>{
        //构建Hbase连接
        val conn = HbaseConnections.getHbaseConn

        //写业务
        partition.foreach(d =>{
          //string  表    Any指的是类
          val parse: (String  , Any) = JsonParse.parse(d)
          StructInterpreter.interpreter(parse._1 , parse , conn)
        })

        //注销连接
        HbaseConnections.closeConn(conn)

      })

    }

    if(doElse.isSuccess){
      //提交偏移量
      kafkaManager.persistOffset(rdd)
    }


  }
})

最后

以上就是威武白猫为你收集整理的手动管理kafka的偏移量的全部内容,希望文章能够帮你解决手动管理kafka的偏移量所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部