我是靠谱客的博主 精明溪流,这篇文章主要介绍与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式介绍一、测试环境部署二、配置运行环境,现在分享给大家,希望可以做个参考。

与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式

介绍

目前各类资产数据分布在不同系统,基于现在管理需求,需要将各系统中资产数据采集到大数据底座中进行统一存储与管理。采用消息队列(Kafka)的方式进行对接,要求每次以增量数据的方式发送,CUD(创建、更新、删除)操作均要求将整行数据传输到消息通道中。

  • 根据需求在服务器的CentOS7服务器上配置测试环境,调试消息推送。
  • 调整消息输出格式,完成项目要求的数据对接信息。

一、测试环境部署

复制代码
1
2
3
4
5
JDK = 1.8 MySQL =5.7.0 zookeeper = 3.7.0 canan = 1.1.5

二、配置运行环境

1. mysql 配置

1、运行vim /etc/my.cnf,修改mysql配置文件my.cnf, 开启 log_bin

复制代码
1
2
3
4
[mysqld] log-bin=mysql-bin server-id=246

2、修改完配置文件后,重新启动mysql服务。

复制代码
1
2
systemctl restart mysqld.service

3、查看mysql的log-bin是否开启成功。

复制代码
1
2
mysql -uroot -p'mysql的登录密码'

查看配置结果是否成功:

复制代码
1
2
mysql> SHOW VARIABLES LIKE '%bin%';

在这里插入图片描述

2. 安装zookeeper

1、canal和kafka都依赖于zookeeper做服务协调,需要部署并配置zookeeper注册中心,这里选用的是3.7.0版本。(注意要下载apache-zookeeper-3.7.0-bin.tar.gz带有bin的版本)。

复制代码
1
2
3
4
5
6
7
8
9
10
cd /root/software wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz tar -zvxf apache-zookeeper-3.7.0-bin.tar.gz mv ./apache-zookeeper-3.7.0-bin /usr/local/zookeeper cd /usr/local/zookeeper mkdir data mkdir logs cd ./conf cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

2、修改配置文件,修改dataDir=/usr/local/zookeeper/data,添加dataLogDir=/usr/local/zookeeper/logs,由于仅用于测试,所以只采用单实例的没有集群配置节点。

复制代码
1
2
3
4
5
6
7
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data dataLogDir=/usr/local/zookeeper/logs clientPort=2181

同时,增加

复制代码
1
2
echo "1">/usr/local/zookeeper/data/myid

3、启动zookeeper注册中心,默认的端口是2181

复制代码
1
2
/usr/local/zookeeper/bin/zkServer.sh start

查看启动状态:

复制代码
1
2
/usr/local/zookeeper/bin/zkServer.sh status

在这里插入图片描述

3.安装Kafka

1、Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper,在此选用kafka_2.12-2.6.2版本

复制代码
1
2
3
4
5
6
mkdir /usr/local/kafka wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz cd /usr/local/kafka tar -zxvf kafka_2.12-2.6.2.tgz rm -rf kafka_2.12-2.6.2.tgz

2、修改配置文件:vim /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties,注意需要配置host.namelisteners=PLAINTEXT://:9092否则只能本地才能访问

复制代码
1
2
3
4
5
zookeeper.connect=localhost:2181 host.name=192.168.110.244 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.110.244:9092

3、启动kafka

复制代码
1
2
/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties &

查看所有topic:

复制代码
1
2
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看指定topic 下面的数据:

复制代码
1
2
/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic example

在这里插入图片描述

4.安装canal数据同步

1、下载Canal的v1.1.5发布版,canal.deployer-1.1.5.tar.gz

复制代码
1
2
3
4
5
6
mkdir /usr/local/canal cd /usr/local/canal wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar -zxvf canal.deployer-1.1.5.tar.gz rm -rf canal.deployer-1.1.5.tar.gz

解压之后目录如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
- bin # 运维脚本 - conf # 配置文件 canal_local.properties # canal本地配置,一般不需要动 canal.properties # canal服务配置 logback.xml # logback日志配置 metrics # 度量统计配置 spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件 example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹 instance.properties # 实例配置,一般指单个数据库的配置 - lib # 服务依赖包 - logs # 日志文件输出目录

2、在开发和测试环境把logback.xml的日志级别改为DEBUG,方便查找问题。主要需要修改canal.propertiesinstance.properties两个配置文件。
canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。
  • canal.serverMode配置项指定为kafka,可选值有tcpkafkarocketmqrabbitmq,默认是tcp
  • 1.1.5版本中的配置 kafka.bootstrap.servers = 192.168.110.244:9092
复制代码
1
2
3
4
canal.instance.parser.parallelThreadSize = 16 canal.serverMode = kafka kafka.bootstrap.servers = 192.168.110.244:9092

instance.properties一般指一个数据库实例的配置,canal架构支持一个canal服务实例,处理多个数据库实例的binlog异步解析。主要需要修改的配置项主要包括:

复制代码
1
2
3
4
5
6
7
8
canal.instance.mysql.slaveId = 2460 #需要配置一个和Master节点的服务ID完全不同的值 canal.instance.master.address=192.168.110.246:3306 #数据库地址 canal.instance.dbUsername=root #账号 canal.instance.dbPassword=Abcde12345!@#$% #密码 canal.instance.defaultDatabaseName = test #默认的数据库(好像用处不大) canal.instance.filter.regex=test.B,test.C #只对两个表的变动作监听 canal.mq.topic=example #解析完的binlog结构化数据会发送到Kafka的命名为example的topic(改成自己的)

三、修改canal的输出格式

1.默认输出格式

复制代码
1
2
{"data":[{"id":"2","order_id":"10086","amount":"10087.0","create_time":"2021-08-04 18:05:05"}],"database":"test","es":1628071686000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","order_id":"varchar(64)","amount":"decimal(10,2)","create_time":"datetime"},"old":[{"amount":"999.0"}],"ts":1628071686131,"type":"UPDATE","pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order"}

2、项目要求的输出格式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
消息格式(JSON): { "table":"表名", "op_type":"操作类型", "op_ts":"操作时间", "data":{ "数据字段名称":"数据字段的值" } } 字段、值和类型说明: - table:表名均采用大写形式,并用下划线分隔命名方式; - op_type:I(创建)、U(更新)、D(删除) ,如无操作类型该字段可不传值; - 时间字段格式:yyyy-MM-dd HH24:mm:ss(示例:2021-07-20 16:11:26); - 数据字段名称:均采用大写形式,并用下划线分隔命名方式; - 数据字段值:除数值类型外,均采用字符串类型。

3、由于默认输出格式不满足项目要求,所以需要修改源代码,调整输出的消息格式;

下载源代码releases
在这里插入图片描述

4、下载源代码后,导入项目中,修改FlatMessageMQMessageUtils两个类中的代码

FlatMessage类修改:

  • ts改成op_ts并设置成字符型,然后再转换的时候ToString()
  • type改成op_type
  • List<Map<String, String>> 改成Map<String, String>,数组改成对象;

源代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class FlatMessage implements Serializable { private static final long serialVersionUID = -3386650678735860050L; private long id; private String database; private String table; private List<String> pkNames; private Boolean isDdl; private String op_type; // binlog executeTime private Long es; // dml build timeStamp private String op_ts; private String sql; private Map<String, Integer> sqlType; private Map<String, String> mysqlType; private Map<String, String> data; private Map<String, String> old; public FlatMessage() { } public FlatMessage(long id) { this.id = id; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getDatabase() { return database; } public void setDatabase(String database) { this.database = database; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } public List<String> getPkNames() { return pkNames; } public void addPkName(String pkName) { if (this.pkNames == null) { this.pkNames = Lists.newArrayList(); } this.pkNames.add(pkName); } public void setPkNames(List<String> pkNames) { this.pkNames = pkNames; } public Boolean getIsDdl() { return isDdl; } public void setIsDdl(Boolean isDdl) { this.isDdl = isDdl; } public String getOp_type() { return op_type; } public void setOp_type(String op_type) { this.op_type = op_type; } public String getOp_ts() { return op_ts; } public void setOp_ts(String op_ts) { this.op_ts = op_ts; } public String getSql() { return sql; } public void setSql(String sql) { this.sql = sql; } public Map<String, Integer> getSqlType() { return sqlType; } public void setSqlType(Map<String, Integer> sqlType) { this.sqlType = sqlType; } public Map<String, String> getMysqlType() { return mysqlType; } public void setMysqlType(Map<String, String> mysqlType) { this.mysqlType = mysqlType; } public Map<String, String> getData() { return data; } public void setData(Map<String, String> data) { this.data = data; } public Map<String, String> getOld() { return old; } public void setOld(Map<String, String> old) { this.old = old; } public Long getEs() { return es; } public void setEs(Long es) { this.es = es; } @Override public String toString() { return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", op_type=" + op_type + ", es=" + es + ", op_ts=" + op_ts + ", sql=" + sql + ", sqlType=" + sqlType + ", mysqlType=" + mysqlType + ", data=" + data + ", old=" + old + "]"; } }

MQMessageUtils类修改,基本就是修改messageConverter函数:

  • 将data中输出的字段名改成大写:
    在这里插入图片描述

  • 修改op_type的操作类型:
    在这里插入图片描述

  • 修改data和old的类型:
    在这里插入图片描述

由于是数组改成对象,需要把data的输出结果放到循环中。

  • 修改op_ts的类型,添加一个自定义的日期类,输出格式为:yyyy-MM-dd HH24:mm:ss

源代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
public class MQMessageUtils { private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder() .softValues(), pkHashConfigs -> { List<PartitionData> datas = Lists.newArrayList(); String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs, ",", ";"), ";"); // schema.table:id^name for (String pkHashConfig : pkHashConfigArray) { PartitionData data = new PartitionData(); int i = pkHashConfig.lastIndexOf(":"); if (i > 0) { String pkStr = pkHashConfig.substring(i + 1); if (pkStr.equalsIgnoreCase("$pk$")) { data.hashMode.autoPkHash = true; } else { data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr, '^')); } pkHashConfig = pkHashConfig.substring(0, i); } else { data.hashMode.tableHash = true; } if (!isWildCard(pkHashConfig)) { data.simpleName = pkHashConfig; } else { data.regexFilter = new AviaterRegexFilter(pkHashConfig); } datas.add(data); } return datas; }); private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder() .softValues(), pkHashConfigs -> { List<DynamicTopicData> datas = Lists.newArrayList(); String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs, ",", ";"), ";"); // schema.table for (String dynamicTopic : dynamicTopicArray) { DynamicTopicData data = new DynamicTopicData(); if (!isWildCard(dynamicTopic)) { data.simpleName = dynamicTopic; } else { if (dynamicTopic.contains("\.")) { data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic); } else { data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic); } } datas.add(data); } return datas; }); private static Map<String, List<TopicPartitionData>> topicPartitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder() .softValues(), tPConfigs -> { List<TopicPartitionData> datas = Lists.newArrayList(); String[] tPArray = StringUtils.split(StringUtils.replace(tPConfigs, ",", ";"), ";"); for (String tPConfig : tPArray) { TopicPartitionData data = new TopicPartitionData(); int i = tPConfig.lastIndexOf(":"); if (i > 0) { String tStr = tPConfig.substring(0, i); String pStr = tPConfig.substring(i + 1); if (!isWildCard(tStr)) { data.simpleName = tStr; } else { data.regexFilter = new AviaterRegexFilter(tStr); } if (!StringUtils.isEmpty(pStr) && StringUtils.isNumeric(pStr)) { data.partitionNum = Integer.valueOf(pStr); } datas.add(data); } } return datas; }); /** * 按 schema 或者 schema+table 将 message 分配到对应topic * * @param message 原message * @param defaultTopic 默认topic * @param dynamicTopicConfigs 动态topic规则 * @return 分隔后的message map */ public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) { List<CanalEntry.Entry> entries; if (message.isRaw()) { List<ByteString> rawEntries = message.getRawEntries(); entries = new ArrayList<>(rawEntries.size()); for (ByteString byteString : rawEntries) { CanalEntry.Entry entry; try { entry = CanalEntry.Entry.parseFrom(byteString); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } entries.add(entry); } } else { entries = message.getEntries(); } Map<String, Message> messages = new HashMap<>(); for (CanalEntry.Entry entry : entries) { // 如果有topic路由,则忽略begin/end事件 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) { put2MapMessage(messages, message.getId(), defaultTopic, entry); } else { Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs); if (topics != null) { for (String topic : topics) { put2MapMessage(messages, message.getId(), topic, entry); } } else { topics = matchTopics(schemaName, dynamicTopicConfigs); if (topics != null) { for (String topic : topics) { put2MapMessage(messages, message.getId(), topic, entry); } } else { put2MapMessage(messages, message.getId(), defaultTopic, entry); } } } } return messages; } /** * 多线程构造message的rowChanged对象,比如为partition/flastMessage转化等处理 </br> * 因为protobuf对象的序列化和反序列化是cpu密集型,串行执行会有代价 */ public static EntryRowData[] buildMessageData(Message message, ThreadPoolExecutor executor) { ExecutorTemplate template = new ExecutorTemplate(executor); if (message.isRaw()) { List<ByteString> rawEntries = message.getRawEntries(); final EntryRowData[] datas = new EntryRowData[rawEntries.size()]; int i = 0; for (ByteString byteString : rawEntries) { final int index = i; template.submit(() -> { try { Entry entry = Entry.parseFrom(byteString); RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); datas[index] = new EntryRowData(); datas[index].entry = entry; datas[index].rowChange = rowChange; } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } }); i++; } template.waitForResult(); return datas; } else { final EntryRowData[] datas = new EntryRowData[message.getEntries().size()]; int i = 0; for (Entry entry : message.getEntries()) { final int index = i; template.submit(() -> { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); datas[index] = new EntryRowData(); datas[index].entry = entry; datas[index].rowChange = rowChange; } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } }); i++; } template.waitForResult(); return datas; } } /** * 将 message 分区 * * @param partitionsNum 分区数 * @param pkHashConfigs 分区库表主键正则表达式 * @param databaseHash 是否取消根据database进行hash * @return 分区message数组 */ @SuppressWarnings("unchecked") public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs, boolean databaseHash) { if (partitionsNum == null) { partitionsNum = 1; } Message[] partitionMessages = new Message[partitionsNum]; List<Entry>[] partitionEntries = new List[partitionsNum]; for (int i = 0; i < partitionsNum; i++) { // 注意一下并发 partitionEntries[i] = Collections.synchronizedList(Lists.newArrayList()); } for (EntryRowData data : datas) { CanalEntry.Entry entry = data.entry; CanalEntry.RowChange rowChange = data.rowChange; // 如果有分区路由,则忽略begin/end事件 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } if (rowChange.getIsDdl()) { partitionEntries[0].add(entry); } else { if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) { String database = entry.getHeader().getSchemaName(); String table = entry.getHeader().getTableName(); HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs); if (hashMode == null) { // 如果都没有匹配,发送到第一个分区 partitionEntries[0].add(entry); } else if (hashMode.tableHash) { int hashCode = table.hashCode(); int pkHash = Math.abs(hashCode) % partitionsNum; pkHash = Math.abs(pkHash); // tableHash not need split entry message partitionEntries[pkHash].add(entry); } else { // build new entry Entry.Builder builder = Entry.newBuilder(entry); RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { int hashCode = 0; if (databaseHash) { hashCode = database.hashCode(); } CanalEntry.EventType eventType = rowChange.getEventType(); List<CanalEntry.Column> columns = null; if (eventType == CanalEntry.EventType.DELETE) { columns = rowData.getBeforeColumnsList(); } else { columns = rowData.getAfterColumnsList(); } if (hashMode.autoPkHash) { // isEmpty use default pkNames for (CanalEntry.Column column : columns) { if (column.getIsKey()) { hashCode = hashCode ^ column.getValue().hashCode(); } } } else { for (CanalEntry.Column column : columns) { if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) { hashCode = hashCode ^ column.getValue().hashCode(); } } } int pkHash = Math.abs(hashCode) % partitionsNum; pkHash = Math.abs(pkHash); // clear rowDatas rowChangeBuilder.clearRowDatas(); rowChangeBuilder.addRowDatas(rowData); builder.clearStoreValue(); builder.setStoreValue(rowChangeBuilder.build().toByteString()); partitionEntries[pkHash].add(builder.build()); } } } else { // 针对stmt/mixed binlog格式的query事件 partitionEntries[0].add(entry); } } } for (int i = 0; i < partitionsNum; i++) { List<Entry> entriesTmp = partitionEntries[i]; if (!entriesTmp.isEmpty()) { partitionMessages[i] = new Message(id, entriesTmp); } } return partitionMessages; } /** * 将Message转换为FlatMessage * * @return FlatMessage列表 * @author agapple 2018年12月11日 下午1:28:32 */ public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) { List<FlatMessage> flatMessages = new ArrayList<>(); for (EntryRowData entryRowData : datas) { CanalEntry.Entry entry = entryRowData.entry; CanalEntry.RowChange rowChange = entryRowData.rowChange; // 如果有分区路由,则忽略begin/end事件 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } // build flatMessage CanalEntry.EventType eventType = rowChange.getEventType(); if (!rowChange.getIsDdl()) { Set<String> updateSet = new HashSet<>(); boolean hasInitPkNames = false; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { FlatMessage flatMessage = new FlatMessage(id); flatMessages.add(flatMessage); flatMessage.setDatabase(entry.getHeader().getSchemaName()); flatMessage.setTable(entry.getHeader().getTableName()); flatMessage.setIsDdl(rowChange.getIsDdl()); String opType = eventType.toString(); switch (opType) { case "INSERT": opType = "I"; break; case "UPDATE": opType = "U"; break; case "DELETE": opType = "D"; break; } flatMessage.setOp_type(opType); flatMessage.setEs(entry.getHeader().getExecuteTime()); //flatMessage.setOp_ts(System.currentTimeMillis()); flatMessage.setOp_ts(DateUtils.getTime()); flatMessage.setSql(rowChange.getSql()); if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE && eventType != CanalEntry.EventType.DELETE) { continue; } Map<String, String> row = new LinkedHashMap<>(); List<CanalEntry.Column> columns; if (eventType == CanalEntry.EventType.DELETE) { columns = rowData.getBeforeColumnsList(); } else { columns = rowData.getAfterColumnsList(); } Map<String, Integer> sqlType = new LinkedHashMap<>(); Map<String, String> mysqlType = new LinkedHashMap<>(); Map<String, String> data = new LinkedHashMap<>(); Map<String, String> old = new LinkedHashMap<>(); for (CanalEntry.Column column : columns) { if (!hasInitPkNames && column.getIsKey()) { flatMessage.addPkName(column.getName()); } sqlType.put(column.getName(), column.getSqlType()); mysqlType.put(column.getName(), column.getMysqlType()); if (column.getIsNull()) { row.put(column.getName().toUpperCase(), null); } else { row.put(column.getName().toUpperCase(), column.getValue()); } // 获取update为true的字段 if (column.getUpdated()) { updateSet.add(column.getName()); } } hasInitPkNames = true; if (!row.isEmpty()) { data = row; } if (eventType == CanalEntry.EventType.UPDATE) { Map<String, String> rowOld = new LinkedHashMap<>(); for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { if (updateSet.contains(column.getName())) { if (column.getIsNull()) { rowOld.put(column.getName(), null); } else { rowOld.put(column.getName(), column.getValue()); } } } // update操作将记录修改前的值 if (!rowOld.isEmpty()) { old = rowOld; } } if (!sqlType.isEmpty()) { flatMessage.setSqlType(sqlType); } if (!mysqlType.isEmpty()) { flatMessage.setMysqlType(mysqlType); } if (!data.isEmpty()) { flatMessage.setData(data); } if (!old.isEmpty()) { flatMessage.setOld(old); } } } } return flatMessages; } /** * 将FlatMessage按指定的字段值hash拆分 * * @param flatMessage flatMessage * @param partitionsNum 分区数量 * @param pkHashConfigs hash映射 * @param databaseHash 是否取消根据database进行hash * @return 拆分后的flatMessage数组 */ public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs, boolean databaseHash) { if (partitionsNum == null) { partitionsNum = 1; } FlatMessage[] partitionMessages = new FlatMessage[partitionsNum]; if (flatMessage.getIsDdl()) { partitionMessages[0] = flatMessage; } else { if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) { String database = flatMessage.getDatabase(); String table = flatMessage.getTable(); HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs); if (hashMode == null) { // 如果都没有匹配,发送到第一个分区 partitionMessages[0] = flatMessage; } else if (hashMode.tableHash) { int hashCode = table.hashCode(); int pkHash = Math.abs(hashCode) % partitionsNum; // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序 pkHash = Math.abs(pkHash); partitionMessages[pkHash] = flatMessage; } else { List<String> pkNames = hashMode.pkNames; if (hashMode.autoPkHash) { pkNames = flatMessage.getPkNames(); } int idx = 0; Map<String, String> row = flatMessage.getData(); int hashCode = 0; if (databaseHash) { hashCode = database.hashCode(); } if (pkNames != null) { for (String pkName : pkNames) { String value = row.get(pkName); if (value == null) { value = ""; } hashCode = hashCode ^ value.hashCode(); } } int pkHash = Math.abs(hashCode) % partitionsNum; // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序 pkHash = Math.abs(pkHash); FlatMessage flatMessageTmp = partitionMessages[pkHash]; if (flatMessageTmp == null) { flatMessageTmp = new FlatMessage(flatMessage.getId()); partitionMessages[pkHash] = flatMessageTmp; flatMessageTmp.setDatabase(flatMessage.getDatabase()); flatMessageTmp.setTable(flatMessage.getTable()); flatMessageTmp.setIsDdl(flatMessage.getIsDdl()); flatMessageTmp.setOp_type(flatMessage.getOp_type()); flatMessageTmp.setSql(flatMessage.getSql()); flatMessageTmp.setSqlType(flatMessage.getSqlType()); flatMessageTmp.setMysqlType(flatMessage.getMysqlType()); flatMessageTmp.setEs(flatMessage.getEs()); flatMessageTmp.setOp_ts(flatMessage.getOp_ts()); flatMessageTmp.setPkNames(flatMessage.getPkNames()); } Map<String, String> data = flatMessageTmp.getData(); if (data == null) { data = new LinkedHashMap<>(); flatMessageTmp.setData(data); } data = row; if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) { Map<String, String> old = flatMessageTmp.getOld(); if (old == null) { old = new LinkedHashMap<>(); flatMessageTmp.setOld(old); } old = flatMessage.getOld(); } idx++; } } else { // 针对stmt/mixed binlog格式的query事件 partitionMessages[0] = flatMessage; } } return partitionMessages; } /** * match return List , not match return null */ public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) { if (StringUtils.isEmpty(pkHashConfigs)) { return null; } List<PartitionData> datas = partitionDatas.get(pkHashConfigs); for (PartitionData data : datas) { if (data.simpleName != null) { if (data.simpleName.equalsIgnoreCase(name)) { return data.hashMode; } } else { if (data.regexFilter.filter(name)) { return data.hashMode; } } } return null; } private static Set<String> matchTopics(String name, String dynamicTopicConfigs) { String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";"); Set<String> topics = new HashSet<>(); for (String item : router) { int i = item.indexOf(":"); if (i > -1) { String topic = item.substring(0, i).trim(); String topicConfigs = item.substring(i + 1).trim(); if (matchDynamicTopic(name, topicConfigs)) { topics.add(topic); // 匹配了一个就退出 break; } } else if (matchDynamicTopic(name, item)) { // 匹配了一个就退出 topics.add(name.toLowerCase()); break; } } return topics.isEmpty() ? null : topics; } public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) { if (StringUtils.isEmpty(dynamicTopicConfigs)) { return false; } boolean res = false; List<DynamicTopicData> datas = dynamicTopicDatas.get(dynamicTopicConfigs); for (DynamicTopicData data : datas) { if (data.simpleName != null) { if (data.simpleName.equalsIgnoreCase(name)) { res = true; break; } } else if (name.contains(".")) { if (data.tableRegexFilter != null && data.tableRegexFilter.filter(name)) { res = true; break; } } else { if (data.schemaRegexFilter != null && data.schemaRegexFilter.filter(name)) { res = true; break; } } } return res; } public static boolean checkPkNamesHasContain(List<String> pkNames, String name) { for (String pkName : pkNames) { if (pkName.equalsIgnoreCase(name)) { return true; } } return false; } public static Integer parseDynamicTopicPartition(String name, String tPConfigs) { if (!StringUtils.isEmpty(tPConfigs)) { List<TopicPartitionData> datas = topicPartitionDatas.get(tPConfigs); for (TopicPartitionData data : datas) { if (data.simpleName != null) { if (data.simpleName.equalsIgnoreCase(name)) { return data.partitionNum; } } else { if (data.regexFilter.filter(name)) { return data.partitionNum; } } } } return null; } private static boolean isWildCard(String value) { // not contaiins '.' ? return StringUtils.containsAny(value, new char[]{'*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\', '$', '^'}); } private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName, CanalEntry.Entry entry) { Message message = messageMap.get(topicName); if (message == null) { message = new Message(messageId, new ArrayList<>()); messageMap.put(topicName, message); } message.getEntries().add(entry); } public static class PartitionData { public String simpleName; public AviaterRegexFilter regexFilter; public HashMode hashMode = new HashMode(); } public static class HashMode { public boolean autoPkHash = false; public boolean tableHash = false; public List<String> pkNames = Lists.newArrayList(); } public static class DynamicTopicData { public String simpleName; public AviaterRegexFilter schemaRegexFilter; public AviaterRegexFilter tableRegexFilter; } public static class TopicPartitionData { public String simpleName; public AviaterRegexFilter regexFilter; public Integer partitionNum; } public static class EntryRowData { public Entry entry; public RowChange rowChange; } }

5、重新打包源代码,并将deployer目录下的target中的canal目录进行部署。

最后

以上就是精明溪流最近收集整理的关于与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式介绍一、测试环境部署二、配置运行环境的全部内容,更多相关与消息队列(Kafka)进行数据对接,修改canal输出内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(82)

评论列表共有 0 条评论

立即
投稿
返回
顶部