我是靠谱客的博主 刻苦白羊,这篇文章主要介绍canal客户端使用多线程消费消息,现在分享给大家,希望可以做个参考。

由于canal消费时是单线程阻塞的,大大降低了程序对于线程的利用率
canal客户端实现官方demo
以下是通过线程池去消费canal订阅的消息代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//存放操作完后的batchId和结果 private static ConcurrentHashMap<Long,Boolean> batchIdMap = new ConcurrentHashMap<>(); //将需要消费的batchId按顺序存放队列中 private volatile static LinkedBlockingQueue<Long> batchIds = new LinkedBlockingQueue<>(); protected void process() { while (running) { connector.connect(); connector.subscribe(canalInstance.getFilter()); //创建线程去响应canal //定时通知canal消费是否成功 Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> { try { while (true) { if (CollectionUtils.isEmpty(batchIdMap) || batchIds.isEmpty()) { return; } Long peek = batchIds.peek(); Boolean aBoolean = batchIdMap.get(peek); if (null == aBoolean) { return; } if (aBoolean) { logger.info("canal ack batch-----{}", peek); connector.ack(batchIds.poll()); } else { logger.info("canal rollback batch-----{}", peek); connector.rollback(batchIds.poll()); } batchIdMap.remove(peek); } } catch (Exception e) { logger.error("scheduleWithFixedDelay", e); } }, 0, 10, TimeUnit.SECONDS); try { MDC.put("destination", canalInstance.getDestination()); //定时通知canal消费是否成功 while (running) { // 获取指定数量的数据 Message message = connector.getWithoutAck(canalInstance.getBatchsize()); long batchId = message.getId(); int size = message.getEntries().size(); if ((batchId == -1 || size == 0)) { continue; } logger.info("canal client execute batchId:{}, size:{}", batchId, size); //往队列中添加消费的batchId batchIds.add(message.getId()); //消费message readMessage(message); } } catch (Exception e) { if(e instanceof CanalClientException){ CanalClientException canalClientException = (CanalClientException) e; if(canalClientException.getCause() instanceof IOException || canalClientException.getCause() instanceof ConnectException){ logger.error("connector error!retry connector", e); connector.connect(); } } logger.error("process error!", e); } finally { connector.disconnect(); logger.info("connector destination"); MDC.remove("destination"); } } } /** * 读取message信息 * @param message * @throws Exception */ private void readMessage(Message message) throws Exception { //启动线程池 threadPoolExecutorUtil.createThreadPoolExecutor().execute(() -> { try { //具体的处理方法ing....返回是否成功 boolean b = canalEntry(message.getEntries()); logger.info("完成消费batchId========={}---Finished====={}---Size====={}",message.getId(),b,message.getEntries().size()); //存放已经消费完成的batchId和是否成功 batchIdMap.put(message.getId(),b); } catch (Exception e) { logger.error("consumeMessage", e); try { Thread.sleep(1000); } catch (Exception e2) {} } }); }

参考资料:canal支持多线程消费吗?
参考资料:canal客户端针对同一个batchId重复ack确认
参考资料:canal客户端确认batchId乱序

最后

以上就是刻苦白羊最近收集整理的关于canal客户端使用多线程消费消息的全部内容,更多相关canal客户端使用多线程消费消息内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(69)

评论列表共有 0 条评论

立即
投稿
返回
顶部