我是靠谱客的博主 称心小土豆,最近开发中收集的这篇文章主要介绍使用flink-cdc采集mysql数据,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言:在使用flink-cdc采集mysql数据时,会遇到各种问题,本文记录了使用flink-cdc采集mysql的流程操作。

1.版本选择:

​ 本人使用的是flink 1.15.0 和 flink-connector-mysql-cdc 2.2.0

2.冲突问题:

​ 直接引用会有版本冲突:flink-shaded-guava30和flink-shaded-guava18冲突,因为flink15使用的是guava30,而flink-connector-mysql-cdc的2.2版本用的是guava18

3.解决冲突:

​ 这里我选用重新编译flink-cdc-connectors,在git上下载2.2版本的flink-cdc-connectors源码,修改代码中用到flink-shaded-guava18的代码。

3.1 修改项目pom

​ 把flink-shaded-guava18.0-13.0版本升级到30.1.1-jre-15.0

3.2 修改源码

import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder改成import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder用到guava18的地方均改为guava30

3.3 重新编译:
mvn spotless:apply
mvn clean install -Dmaven.test.skip=true
3.4 其他:

​ 最新master的2.3版本的flink-connector-mysql-cdc直接使用的就是flink-shaded-guava:30.1.1-jre-15.0,也可以替换成2.3版本

4.开发流程

​ 1.导入重新编译的flink-connector-mysql-cdc依赖

​ 2.直接上完整代码

public class FlinkMysqlCDC {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("hc")
.tableList("hc.student")
.username("flinkuser")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySql Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("MySql Source Reader");
}
}

3.mysql权限和设置

# 创建mysql用户:
CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY '123456';
# 授予用户所需权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY '123456';
#最终确定用户的权限
FLUSH PRIVILEGES;
# 设置server id (目前没发现需要的实际操作)
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
#mysql会话超时(大型作业可能会用到)
interactive_timeout
wait_timeout
#增量快照原理
#将表拆分成块(chunk),并行读取chunk;
#1.记录binlog的low_offset,2.读取快照,3.binlog为high_offset,4.读取low-high的binlog合到chunk中输出,最后单个读取high后的binlog

欢迎分享讨论。

最后

以上就是称心小土豆为你收集整理的使用flink-cdc采集mysql数据的全部内容,希望文章能够帮你解决使用flink-cdc采集mysql数据所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部