我是靠谱客的博主 霸气小霸王,最近开发中收集的这篇文章主要介绍Flink对接kafka自行管理offsets,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Flink再接入kafka时候难免会遇到重复消费和少消费场景

网上很多还没有flink的偏移量管理的好文档

自行设置偏移量保存位置

这里采用了zookeeper作为保存的地址,就是实时更新偏移量属性。再job挂掉后重新拉取偏移量保存下来
就能一次消费啦,但真正做到一次消费必须和业务场景结合来做,比如事务。

废话不多说啦,我本地实现了一个小demo

  1. 先导入必要的pom

<dependency>
<groupId>com.wehotel</groupId>
<artifactId>commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!--基于scala-logging和logback的日志打印模板,其中logback是一个更高效/更优于log4j的日志打印框架,目前正逐渐替代log4j的位置,以下为实现日志打印的几个步骤:-->
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.7.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
  1. maven项目导入成功就可以实现下面的代码功能啦
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.zookeeper.CreateMode;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
  1. 我们使用Apache Curator来操作zookeeper,先创建kafka的基类吧
/**
* <Description>
*
* @author enjian
* @version 1.0
* @taskId:
* @createDate 2020/04/01 10:10
* @see ""
*/
public class KafkaSource {
private String topic;
private String message;
private Integer partition;
private Long offset;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
}
  1. 主类逻辑,这里面我把kafka地址和zookeeper地址隐藏了,自行改动
**
* <Description>
*
* @author enjian
* @version 1.0
* @taskId:
* @createDate 2020/03/31 11:35
* @see ""
*/
public class ZKUtils {
//会话超时时间
private static final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;
//ZooKeeper服务地址
private static final String CONNECT_ADDR = "xxxxx";
//创建连接实例
private static CuratorFramework client ;
public static void main(String[] args) throws Exception {
//1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(retryPolicy)
//
.namespace("super") //命名空间
.build();
//3 开启连接
client.start();
StreamExecutionEnvironment flinkEnv = changeEnv();
Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> kafkaOffset = getFromOffsets("tripGoodsCA00001", "test");
FlinkKafkaConsumer011<KafkaSource> ds = createKafkaSource("tripGoodsCA00001", "test");
FlinkKafkaConsumerBase flinkKafkaConsumerBase = ds.setStartFromLatest();
// 如果kafka不为空的话,从这里开始执行
if (kafkaOffset._2){
System.out.println("----------------------zookeeper manager offsets-----------------------------------");
Map<KafkaTopicPartition, Long> specificStartOffsets = kafkaOffset._1;
flinkKafkaConsumerBase = ds.setStartFromSpecificOffsets(specificStartOffsets);
}
DataStreamSource<KafkaSource> tetsds = flinkEnv.addSource(flinkKafkaConsumerBase);
tetsds.print();
//
tetsds.print();
flinkEnv.execute("test");
}
public static void
ensureZKExists(String zkTopicPath) {
try {
if (client.checkExists().forPath(zkTopicPath) == null) {//zk中没有没写过数据,创建父节点,也就是会递归创建
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
// 节点类型
.forPath(zkTopicPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void storeOffsets(HashMap<String, Long> offsetRange, String topic, String group) {
String zkTopicPath = String.format("/offsets/%s/%s", topic,group);
Iterator<Map.Entry<String, Long>> setoffsetrange = offsetRange.entrySet().iterator();
while (setoffsetrange.hasNext()) {
Map.Entry<String, Long> offsethas = setoffsetrange.next();
//partition
String path = String.format("%s/%s", zkTopicPath, offsethas.getKey());
ensureZKExists(path);
try {
client.setData().forPath(path, (offsethas.getValue() + "").getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 从zookeeper中读取kafka对应的offset
* @param topic
* @param group
* @return Tuple2<HashMap<TopicPartition, Long>, Boolean>
*/
public static Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> getFromOffsets(String topic, String group) {
Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> returnTuple2 = null;
///xxxxx/offsets/topic/group/partition/
String zkTopicPath = String.format("/offsets/%s/%s", topic,group);
ensureZKExists(zkTopicPath);
HashMap<KafkaTopicPartition, Long> offsets = new HashMap<KafkaTopicPartition, Long>();
try {
List<String> partitions = client.getChildren().forPath(zkTopicPath);
for (String partition : partitions) {
//
System.out.println(new String(client.getData().forPath(String.format("%s/%s", zkTopicPath,partition))));
Long offset = Long.valueOf(new String(client.getData().forPath(String.format("%s/%s", zkTopicPath,partition))));
KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, Integer.valueOf(partition));
offsets.put(topicPartition, offset);
}
if (offsets.isEmpty()) {
return new Tuple2<>(offsets, false);
} else {
return new Tuple2<>(offsets, true);
}
} catch (Exception e) {
e.printStackTrace();
}
//如果有直接读取对应的数据
return returnTuple2;
}
public static Properties getKafkaProperties(String groupId) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "");
//
properties.setProperty("zookeeper.connect", getStrValue(new Constants().KAFKA_ZOOKEEPER_LIST));
properties.setProperty("group.id", groupId);
return properties;
}
public static Properties getProduceKafkaProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
public static StreamExecutionEnvironment changeEnv(){
final
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().enableForceKryo();
//启用检查点,设置检查点的最小间隔为5000ms
//
env.setStateBackend(new RocksDBStateBackend(chkPointPath));
env.enableCheckpointing(600000);
//设置一致性级别为exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置检查点超时间,如果在超时后,丢弃这个检查点,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置快照失败后任务继续正常执行
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
//设置并发检查点数量为1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
return env;
}
/**
* 创建kafka的source
* @param topic
* @param groupid
* @return
*/
public static FlinkKafkaConsumer011<KafkaSource> createKafkaSource(String topic, String groupid){
// kafka消费者配置
FlinkKafkaConsumer011<KafkaSource> dataStream = new FlinkKafkaConsumer011<KafkaSource>(topic, new KeyedDeserializationSchema<KafkaSource>() {
@Override
public TypeInformation<KafkaSource> getProducedType() {
return
TypeInformation.of(new TypeHint<KafkaSource>() {
});
}
@Override
public KafkaSource deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
KafkaSource kafkasource = new KafkaSource();
kafkasource.setTopic(topic);
kafkasource.setMessage(message.toString());
kafkasource.setPartition(partition);
kafkasource.setOffset(offset);
HashMap<String,Long> partitionAndOffset = new HashMap<>();
partitionAndOffset.put(String.valueOf(partition),offset);
storeOffsets(partitionAndOffset,topic,groupid);
return kafkasource;
}
@Override
public boolean isEndOfStream(KafkaSource s) {
return false;
}
}, getKafkaProperties(groupid));
//设置消息的起始位置的偏移量,最晚的记录开始启动
dataStream.setStartFromLatest();
//自动提交offset
//
dataStream.setCommitOffsetsOnCheckpoints(true);
return dataStream;
}
  1. 可以登录zookeeper上看到
[zk: localhost:2181(CONNECTED) 36] get /offsets/tripGoodsCA00001/test/0
160231645
cZxid = 0x300049e7e
ctime = Wed Apr 01 11:34:51 CST 2020
mZxid = 0x30004ac85
mtime = Wed Apr 01 13:41:05 CST 2020
pZxid = 0x300049e7e
cversion = 0
dataVersion = 449
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: localhost:2181(CONNECTED) 37] get /offsets/tripGoodsCA00001/test/0
160231645
cZxid = 0x300049e7e
ctime = Wed Apr 01 11:34:51 CST 2020
mZxid = 0x30004ac85
mtime = Wed Apr 01 13:41:05 CST 2020
pZxid = 0x300049e7e
cversion = 0
dataVersion = 449
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

7.总结
在使用flink做项目时候,刚开始一个人摸索全新框架难免会碰壁,跑到官方文档慢慢查询需要的状态改动,调试,慢慢的,所有的难题都会被解决,此次做的目的为了我flink反爬虫项目铺垫的,之前的strom方式太老化了,经过1个月的摸索,优化更新,终于实现了scala版本和java版本迭代,为什么是2个版本呢,也是为了提升编码能力和两种实现方式的摸索,很有幸去尝试将flink嵌入到微服务应用里。
博客写的不多,虚心学习,我们江湖见!

最后

以上就是霸气小霸王为你收集整理的Flink对接kafka自行管理offsets的全部内容,希望文章能够帮你解决Flink对接kafka自行管理offsets所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部