我是靠谱客的博主 拼搏小刺猬,最近开发中收集的这篇文章主要介绍Canal订阅方式1、Mysql配置2、专有云RocketMQ3、Tcp模式4、Apache RocketMQ,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
下载地址
资料参考
1、Mysql配置
my.cnf
配置
[mysqld]
basedir=/root/mysql/mysql-5.7.31-el7-x86_64
datadir=/opt/mysql/data
log-bin=mysql-bin
server-id=1
binlog-format=ROW
/bin/sh bin/mysqld_safe --defaults-file=/root/mysql/mysql-5.7.31-el7-x86_64/my.cnf --user=mysql
2、专有云RocketMQ
专有云内的版本为3.8.2,
mq_http_sdk
http协议的支持最低版本为3.16,不满足示例代码1、示例代码2
2.1 canal配置
# conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
canal.mq.topic=canal
canal.mq.partition=0
# conf/canal.properties
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.serverMode = rocketMQ
canal.aliyun.accessKey =xxxx
canal.aliyun.secretKey = xxxxx
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.servers = xxxx:9876
canal.mq.producerGroup =GID_Canal
canal.mq.namespace =
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
2.2 sample
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_Canal");
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey,"xxxx");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "xxxxx");
//设置发送超时时间,单位毫秒
// properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "6000");
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxxx:9876");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("canal", "*", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println("topic: " + new String(message.getTopic()) + "nbody: " + new String(message.getBody())) ;
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
{
"data":[
{
"a":"5555",
"b":"66666"
}
],
"database":"db1",
"es":1654502921000,
"id":3,
"isDdl":false,
"mysqlType":{
"a":"int",
"b":"int"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"a":4,
"b":4
},
"table":"tb1",
"ts":1654502921482,
"type":"INSERT"
}
3、Tcp模式
需要安装python3的环境
3.1 yum安装
# yum源
cd /etc/yum.repos.d/
# yum.repo
[CentOS-7-Base]
name=CentOS-7-Base
baseurl=xxxx/CentOS/base
enabled=1
gpgcheck=1
gpgkey=xxxx/CentOS/base
[CentOS-7-Updates]
name=CentOS-7-Updates
baseurl=xxxx/CentOS/base
enabled=1
gpgcheck=1
gpgkey=xxxx/CentOS/base
[CentOS-7-Extras]
name=CentOS-7-Extras
baseurl=xxxx/CentOS/base
enabled=1
gpgcheck=1
gpgkey=xxxx/CentOS/base
[CentOS-7-Epel]
name=CentOS-7-Epel
baseurl=xxxx/CentOS/base
enabled=1
gpgcheck=1
gpgkey=xxxx/CentOS/base
# 安装依赖
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make
3.2 python3.6安装
mkdir -p /root/python3/source
mkdir -p /root/python3/home
tar -zxvf Python-3.6.5.tgz -C /root/python3/source
cd /root/python3/source/Python-3.6.5
./configure --prefix=/root/python3/home
make
make install
ln -s /root/Python-3.6.5/bin/python3.6 /usr/local/bin/python3
ln -s /root/Python-3.6.5/bin/pip3.6 /usr/local/bin/pip3
3.3 canal配置
# conf/canal.properties
canal.ip = 10.1.74.212
canal.port = 11111
canal.user = canal
canal.serverMode = tcp
3.4 sample
pip3 install canal-python pip3 install protobuf
import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
client = Client()
client.connect(host='10.1.74.212', port=11111)
client.check_valid(username=b'canal')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\..*')
while True:
message = client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
else:
format_data['before'] = format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
client.disconnect()
3.5 验证
mysql> insert into tb1 values(55,77);
Query OK, 1 row affected (0.01 sec)
mysql> insert into tb1 values(55,88);
Query OK, 1 row affected (0.01 sec)
connected to 10.1.74.212:11111
Auth succed
Subscribe succed
{'db': 'db1', 'table': 'tb1', 'event_type': 1, 'data': {'b': '77'}}
{'db': 'db1', 'table': 'tb1', 'event_type': 1, 'data': {'b': '88'}}
4、Apache RocketMQ
CLI Admin Tool参考地址
4.1 安装
# /root/rocketmq-all-4.8.0-bin-release/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# /root/rocketmq-all-4.8.0-bin-release/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
# conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=10.1.74.212
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
> nohup sh bin/mqbroker -n 10.1.74.212:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
> lsof -i tcp | grep 10911
> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv
[root@work2 rocketmq-all-4.8.0-bin-release]# bin/mqadmin clusterList
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
DefaultCluster work2 0 172.17.0.1:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 459584.55 0.2927
4.2 创建topic/group
# topic
[root@work2 rocketmq-all-4.8.0-bin-release]# bin/mqadmin updateTopic -n localhost:9876 -b 10.1.74.212:10911 -r 4 -w 4 -t canal
create topic to 172.17.0.1:10911 success.
TopicConfig [topicName=canal, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
# group
[root@work2 rocketmq-all-4.8.0-bin-release]# bin/mqadmin updateSubGroup -n localhost:9876 -b 10.1.74.212:10911 -g GID_Canal
create subscription group to localhost:10911 success.
SubscriptionGroupConfig [groupName=GID_Canal, consumeEnable=true, consumeFromMinEnable=false, consumeBroadcastEnable=false, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
# 查看topic
[root@work2 rocketmq-all-4.8.0-bin-release]# bin/mqadmin topicList -n localhost:9876
work2
SCHEDULE_TOPIC_XXXX
RMQ_SYS_TRANS_HALF_TOPIC
DefaultCluster_REPLY_TOPIC
%RETRY%please_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
TopicTest
canal
TBW102
SELF_TEST_TOPIC
DefaultCluster
4.3 canal配置
# conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
canal.mq.topic=canal
canal.mq.partition=0
# conf/canal.properties
canal.serverMode = rocketMQ
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = GID_Canal
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 10.1.74.212:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
4.4 验证
[root@work2 rocketmq-all-4.8.0-bin-release]# bin/mqadmin topicStatus -n localhost:9876 -t canal
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Broker Name #QID #Min Offset #Max Offset #Last Updated
work2 0 0 5 2022-06-06 16:55:42,132
work2 1 0 0
work2 2 0 0
work2 3 0 0
数据更新
mysql> insert into tb1 values(99,101);
Query OK, 1 row affected (0.00 sec)
订阅通知
- python客户端
rocketmq-client-cpp-2.0.0
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
pip install rocketmq-client-python
#!/usr/bin/env python
# coding=utf-8
import sys
import time
from rocketmq.client import PushConsumer, ConsumeStatus
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer('GID_Canal')
consumer.set_name_server_address('10.1.74.212:9876')
consumer.subscribe('canal', callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
(u'7F0000012C536E1EC3182B80B1BE0003', '{"data":[{"a":"101","b":"102"}],"database":"db1","es":1654742654000,"id":5,"isDdl":false,"mysqlType":{"a":"int","b":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"a":4,"b":4},"table":"tb1","ts":1654742654397,"type":"INSERT"}')
- java客户端
public class ApacheRocketMQConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_Canal");
// Specify name server addresses.
consumer.setNamesrvAddr("10.1.74.212:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("canal", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
System.out.println("Topic: " + message.getTopic() + "nMessage: " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Consumer Started.
Topic: canal
Message: {"data":[{"a":"99","b":"101"}],"database":"db1","es":1654741086000,"id":4,"isDdl":false,"mysqlType":{"a":"int","b":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"a":4,"b":4},"table":"tb1","ts":1654741086362,"type":"INSERT"}
最后
以上就是拼搏小刺猬为你收集整理的Canal订阅方式1、Mysql配置2、专有云RocketMQ3、Tcp模式4、Apache RocketMQ的全部内容,希望文章能够帮你解决Canal订阅方式1、Mysql配置2、专有云RocketMQ3、Tcp模式4、Apache RocketMQ所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复