概述
前言:在使用flink-cdc
采集mysql
数据时,会遇到各种问题,本文记录了使用flink-cdc
采集mysql
的流程操作。
1.版本选择:
本人使用的是flink
1.15.0 和 flink-connector-mysql-cdc
2.2.0
2.冲突问题:
直接引用会有版本冲突:flink-shaded-guava
30和flink-shaded-guava
18冲突,因为flink15使用的是guava30,而flink-connector-mysql-cdc
的2.2版本用的是guava18
3.解决冲突:
这里我选用重新编译flink-cdc-connectors
,在git上下载2.2版本的flink-cdc-connectors
源码,修改代码中用到flink-shaded-guava18
的代码。
3.1 修改项目pom
把flink-shaded-guava
的18.0-13.0
版本升级到30.1.1-jre-15.0
3.2 修改源码
把import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder
改成import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder
用到guava18的地方均改为guava30
3.3 重新编译:
mvn spotless:apply
mvn clean install -Dmaven.test.skip=true
3.4 其他:
最新master的2.3版本的flink-connector-mysql-cdc
直接使用的就是flink-shaded-guava:30.1.1-jre-15.0
,也可以替换成2.3版本
4.开发流程
1.导入重新编译的flink-connector-mysql-cdc
依赖
2.直接上完整代码
public class FlinkMysqlCDC {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("hc")
.tableList("hc.student")
.username("flinkuser")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySql Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("MySql Source Reader");
}
}
3.mysql权限和设置
# 创建mysql用户:
CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY '123456';
# 授予用户所需权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY '123456';
#最终确定用户的权限
FLUSH PRIVILEGES;
# 设置server id (目前没发现需要的实际操作)
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
#mysql会话超时(大型作业可能会用到)
interactive_timeout
wait_timeout
#增量快照原理
#将表拆分成块(chunk),并行读取chunk;
#1.记录binlog的low_offset,2.读取快照,3.binlog为high_offset,4.读取low-high的binlog合到chunk中输出,最后单个读取high后的binlog
欢迎分享讨论。
最后
以上就是称心小土豆为你收集整理的使用flink-cdc采集mysql数据的全部内容,希望文章能够帮你解决使用flink-cdc采集mysql数据所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复