我是靠谱客的博主 清秀未来,这篇文章主要介绍canal同步mysql到kafka,canal同步数据到kafka配置及源码,现在分享给大家,希望可以做个参考。

canal.properties 配置

86e4ba9776f91bb35092ed7ae35d9c58.png

b2732ef5ad3fde668a7cc66d02592015.png

instance.properties 配置

de91f1ad34a37615840bbb16144871cb.png

kafka的 server.properties 配置

0c03afd2a82858b60d357293a933fba1.png

canal客户端代码

import com.alibaba.fastjson.JSONObject;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import com.example.demo.kafka.KafkaClient;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

@Component

public class CannalClient {

private final static int BATCH_SIZE = 1000;

public void canalService() {

// 创建连接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");

try {

// 打开连接

connector.connect();

// 订阅数据库表,全部表

connector.subscribe(".*\..*");

// 回滚到未进行 ack 的地方,下次 fetch 的时候,可以从最后一个没有 ack 的地方开始拿

connector.rollback();

while (true) {

// 获取指定数量的数据

Message message = connector.getWithoutAck(BATCH_SIZE);

// 获取批量 ID

long batchId = message.getId();

// 获取批量的数量

int size = message.getEntries().size();

// 如果没有数据

if (batchId == -1 || size == 0) {

try {

// 线程休眠 2 秒

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

} else {

List entries = message.getEntries();

for (CanalEntry.Entry entry : entries) {

if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){

if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){

continue;

}

// 获取 storevalue,并反序列化

CanalEntry.RowChange rowChage;

try{

rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

}catch (Exception e){

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

// 获取当前entry是对哪个表的操作结果

String tableName = entry.getHeader().getTableName();

handle(tableName, rowChage);

}

}

}

connector.ack(batchId);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

connector.disconnect();

}

}

private static void handle(String tableName, CanalEntry.RowChange rowChange){

String canaltopic = "canaltopic";

String partition = "partition10";

//获取操作类型:insert/update/delete类型

CanalEntry.EventType eventType = rowChange.getEventType();

if (eventType.equals(CanalEntry.EventType.INSERT)){

List rowDataList = rowChange.getRowDatasList();

for (CanalEntry.RowData rowData : rowDataList){

JSONObject jsonObject = new JSONObject();

List afterColumnList = rowData.getAfterColumnsList();

for (CanalEntry.Column column : afterColumnList){

jsonObject.put(column.getName(), column.getValue());

}

// 将数据发送到 kafka

KafkaClient.send(canaltopic, partition, jsonObject.toString());

}

}

}

}

kafka 生产者代码

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.springframework.stereotype.Component;

import java.util.*;

@Component

public class KafkaClient {

private static Producer producer;

static {

producer = createProducer();

}

//创建Producer

public static Producer createProducer(){

//kafka连接的配置信息

Properties properties = new Properties();

properties.put("bootstrap.servers","localhost:9092");

properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

return kafkaProducer;

}

//将数据写入到kafka

public static void send(String topic, String partition, String value){

producer.send(new ProducerRecord<>(topic, partition, value));

}

}

kafka 消费者代码

import java.time.Duration;

import java.util.*;

import java.util.function.Function;

import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

import org.springframework.stereotype.Component;

@Component

public class KafkaConsumerOps{

private static Consumer consumer;

static {

consumer = createConsumer();

}

//创建Producer

public static Consumer createConsumer() {

//kafka连接的配置信息

Properties properties = new Properties();

properties.put("bootstrap.servers","localhost:9092");

properties.put("group.id", "logGroup");

properties.put("enable.auto.commit", "false");

properties.put("session.timeout.ms", "30000");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer consu = new KafkaConsumer(properties);

return consu;

}

// 数据从kafka中进行消费

public static void consumerOps(){

//消费者订阅topic

consumer.subscribe(Collections.singletonList("canaltopic")); // 指定 topic 进行消费

try {

while (true) {

ConsumerRecords records = consumer.poll(Duration.ofMillis(300));

for (ConsumerRecord record : records) {

System.out.println("topic ==>> " + record.topic() + ", partition ==>> " + record.partition()

+ ", offset ==>>" + record.offset() + ", key ==>>" + record.key() + ", value ==>>" + record.value());

try{

consumer.commitAsync();

}catch (Exception e){

e.printStackTrace();

}finally {

try {

consumer.commitSync();

}catch (Exception e){

e.printStackTrace();

}

}

}

}

} finally {

consumer.close();

}

}

}

最后

以上就是清秀未来最近收集整理的关于canal同步mysql到kafka,canal同步数据到kafka配置及源码的全部内容,更多相关canal同步mysql到kafka内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部