我是靠谱客的博主 整齐麦片,最近开发中收集的这篇文章主要介绍Flink中实现自定义反序列化消费Kafka中的二进制数据,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.简介

kafka中的数据通常以二进制或者字符串存储,本文是针对kafka中的二进制数据,自定义反序列化类,利用flink读取消费,将二进制数据反序列成指定的实体类对象。

2.代码实现

2.1编写自定义反序列化类继承AbstractDeserializationSchema,重写deserializer方法

package com.bigdata.deserializer;
import com.bigdata.bean.User;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import java.io.IOException;
/**
* @ description: 将二进制数据反序列化为对应实体类
* @ author: spencer
* @ date: 2021/1/13 13:43
*/
public class UserDesrializationSchema extends AbstractDeserializationSchema<User> {
@Override
public User deserialize(byte[] bytes) throws IOException {
return new User(bytes);
}
}

2.2 Flink启动类

package com.bigdata.app;
import com.bigdata.bean.CanalRowData;
import com.bigdata.bean.User;
import com.bigdata.constant.Constant;
import com.bigdata.deserializer.CanalRowDataDeserializationSchema;
import com.bigdata.deserializer.UserDesrializationSchema;
import com.bigdata.util.FlinkUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* @ description: 启动类
* @ author: spencer
* @ date: 2021/1/13 9:53
*/
public class FlinkApp {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1.创建flink执行环境并设置参数
* 2.获取kafka中的数据源并设置watermark
* 3.根据业务需求执行etl
* 4.执行execute
*/
// 1.创建flink执行环境并设置参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStateBackend(new FsStateBackend("hdfs://flink101:9000/checkpoint/user"));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.获取kafka中的数据源并设置watermark
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, FlinkUtil.getParam().get(Constant.KAFKA_SERVERS));
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer_id");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<User>(
"user",
new UserDesrializationSchema(), // 此处使用自定义的反序列化方式
properties
);
//
FlinkKafkaConsumer<CanalRowData> kafkaConsumer = new FlinkKafkaConsumer<>(
//
FlinkUtil.getParam().get(Constant.KAFKA_TOPIC),
//
new CanalRowDataDeserializationSchema(),
//
properties
//
);
DataStreamSource streamSource = env.addSource(kafkaConsumer);
streamSource.print();
env.execute("FlinkApp");
}
}

最后

以上就是整齐麦片为你收集整理的Flink中实现自定义反序列化消费Kafka中的二进制数据的全部内容,希望文章能够帮你解决Flink中实现自定义反序列化消费Kafka中的二进制数据所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部