我是靠谱客的博主 魁梧灯泡,最近开发中收集的这篇文章主要介绍阿里开源Canal--④投递数据到Kafka,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

基本说明

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

  • kafka: (https://github.com/apache/kafka)
  • RocketMQ ?https://github.com/apache/rocketmq)

##1.配置修改
基于前面第二章节内容我们搭建的canal server(Billow自编译版本为1.1.3,支持动态Topic配置),我们在instance的配置文件中做配置的修改。

###1.1.修改instance 配置文件

vi conf/example/instance.properties
#
按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考前面Billow发的文章:

dynamicTopic规则: 表达式如果只有库名则匹配库名的数据都会发送到对应名称topic, 如果是库名.表名则匹配的数据会发送到以’库名_表名’为名称的topic。如要指定topic名称,则可以配置:

canal.mq.dynamicTopic=examp2:.*;exmaple3:mytest\..*,mytest2\..*;example4:mytest3.user

以topic名 ‘:’ 正则规则作为配置, 多个topic配置之间以 ';'隔开, message会发送到所有符合规则的topic

###1.2.修改canal 配置文件

vi /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

mq相关参数说明


###canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号分隔

  • 例子1:test.test 指定匹配的单表,发送到以 test_test为名字的topic上
  • 例子2:.…* 匹配所有表,每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

支持指定topic名称匹配, 配置格式:topicName:schema 或 schema.table,多个配置之间使用逗号分隔, 多组之间使用 ; 分隔

  • 例子:test:test,test1.test1;test2:test2,test3.test1 针对匹配的表会发送到指定的topic上

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

###表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.…*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.…*: p k pk pk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.…* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test.test:id,.…* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

###mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

###1.4.启动

cd /usr/local/canal/
sh bin/startup.sh

1.5.查看日志

a.查看 logs/canal/canal.log

vi logs/canal/canal.log

b. 查看instance的日志:

vi logs/example/example.log

1.6 关闭

cd /usr/local/canal/
sh bin/stop.sh

1.7.MQ数据消费

canal源码中有实例代码;如下

public class CanalKafkaClientExample {
protected final static Logger
logger
= LoggerFactory.getLogger(CanalKafkaClientExample.class);
private KafkaCanalConnector
connector;
private static volatile boolean
running = false;
private Thread
thread
= null;
private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("parse events has an error", e);
}
};
public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId){
connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
}
public static void main(String[] args) {
try {
final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
AbstractKafkaTest.zkServers,
AbstractKafkaTest.servers,
AbstractKafkaTest.topic,
AbstractKafkaTest.partition,
AbstractKafkaTest.groupId);
logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
kafkaCanalClientExample.start();
logger.info("## the canal kafka consumer is running now ......");
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the kafka consumer");
kafkaCanalClientExample.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka consumer:", e);
} finally {
logger.info("## kafka consumer is down.");
}
}
});
while (running)
;
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the kafka consumer:", e);
System.exit(0);
}
}
public void start() {
Assert.notNull(connector, "connector is null");
thread = new Thread(new Runnable() {
public void run() {
process();
}
});
thread.setUncaughtExceptionHandler(handler);
thread.start();
running = true;
}
public void stop() {
if (!running) {
return;
}
running = false;
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
}
private void process() {
while (!running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
while (running) {
try {
connector.connect();
connector.subscribe();
while (running) {
try {
List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
if (messages == null) {
continue;
}
for (Message message : messages) {
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
// printSummary(message, batchId, size);
// printEntry(message.getEntries());
logger.info(message.toString());
}
}
connector.ack(); // 提交确认
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
try {
connector.unsubscribe();
} catch (WakeupException e) {
// No-op. Continue process
}
connector.disconnect();
}
}

最后

以上就是魁梧灯泡为你收集整理的阿里开源Canal--④投递数据到Kafka的全部内容,希望文章能够帮你解决阿里开源Canal--④投递数据到Kafka所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部