我是靠谱客的博主 高挑星星,最近开发中收集的这篇文章主要介绍Canal监控库表代码(新),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

网上的监控代码很多,但是我运行了几个后发现不太符合我们的sql语句格式。

于是,自己写了一下,并且提供了很大的可扩展性:
 

监控sql结果如下:

[sql]----> UPDATE test.user SET name = 'vv' WHERE id = 1
[sql]----> INSERT INTO test.user (id,name) VALUES (2,'aaa')
[sql]----> DELETE FROM user WHERE id=2
package com.example.demo3.dataTransmation.listen.canal;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;


import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Pattern;

public class CanalSynClient {

    //sql队列
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    /**
     * canal入库方法
     */
    public void run() throws InvalidProtocolBufferException {

        //自己的linux服务地址
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example", "", "");
        int batchSize = 1000;

        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 1) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }


    /**
     * 模拟执行队列里面的sql语句
     */
    public void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);

            // this.execute(sql.toString());
        }
    }
    /**
     * 数据处理
     *
     * @param entrys
     */
    private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = null;
                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (com.google.protobuf.InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
                // 获得当前的sql类型 eventType : UPDATE...
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 保存更新语句
     *
     * @param entry
     */
    private void saveUpdateSql(Entry entry) throws InvalidProtocolBufferException {
        RowChange rowChange;
        rowChange = RowChange.parseFrom(entry.getStoreValue());
        List<RowData> rowDatasList = rowChange.getRowDatasList();
        String schemaName = entry.getHeader().getSchemaName();
        String tableName = entry.getHeader().getTableName();

        for (RowData rowData : rowDatasList) {

            StringBuilder sql = createUpdateSql(rowData, schemaName, tableName);

            sql.append(" WHERE ");
            List<Column> oldColumnList = rowData.getBeforeColumnsList();
            for (Column column : oldColumnList) {
                if (column.getIsKey()) {
                    //暂时只支持单一主键
                    sql.append(column.getName() + " = " + column.getValue());
                    break;
                }
            }

            SQL_QUEUE.add(sql.toString());
        }
    }


    private StringBuilder createUpdateSql(RowData rowData, String schemaName, String tableName){

        List<Column> newColumnList = rowData.getAfterColumnsList();

        // UPDATE test.user SET
        StringBuilder sql = new StringBuilder("UPDATE " + schemaName + "." + tableName + " SET");

        int next = 0;
        for(Column column: newColumnList){
            // 如果是没有被更新的列,则跳过
            if(!column.getUpdated()){
                continue;
            }
            // 1、解析type类型
            String sqlType = parseSqlType(column.getMysqlType());
            if(null == sqlType){
                throw new RuntimeException("未知类型数据!");
            }
            // 2、添加
            String addSql = andSql(sqlType, column, next);
            sql.append(addSql);
            // 添加and
            next++;
        }

        return sql;
    }

    private String parseSqlType(String mysqlType){
        String stringPattern = "varchar.*";
        String intPattern = "int.*";

        if(Pattern.matches(stringPattern, mysqlType)){
            return "varchar";
        }

        if(Pattern.matches(intPattern, mysqlType)){
            return "int";
        }
        return null;
    }

    private String andSql(String sqlType, Column column, int next){
        StringBuilder addSql = new StringBuilder();
        if(sqlType.equals("varchar")){
            if(next == 0){
                addSql.append(" " + column.getName() + " = '" + column.getValue() + "'");
            }else {
                addSql.append(" and " + column.getName() + " = '" + column.getValue() + "'");
            }
        }else if(sqlType.equals("int")){
            if(next == 0){
                addSql.append(" " + column.getName() + " = " + column.getValue());
            }else {
                addSql.append(" and " + column.getName() + " = " + column.getValue());
            }
        }
        return addSql.toString();
    }



    /**
     * 保存删除语句
     *
     * @param entry
     */
    private void saveDeleteSql(Entry entry) throws InvalidProtocolBufferException {
        RowChange rowChange ;
        rowChange = RowChange.parseFrom(entry.getStoreValue());
        List<RowData> rowDatasList = rowChange.getRowDatasList();
        for (RowData rowData : rowDatasList) {
            List<Column> columnList = rowData.getBeforeColumnsList();
            StringBuffer sql = new StringBuffer("DELETE FROM " + entry.getHeader().getTableName() + " WHERE ");
            for (Column column : columnList) {
                if (column.getIsKey()) {
                    //暂时只支持单一主键
                    sql.append(column.getName() + "=" + column.getValue());
                    break;
                }
            }
            SQL_QUEUE.add(sql.toString());
        }
    }

    /**
     * 保存插入语句
     *
     * @param entry
     */
    private void saveInsertSql(Entry entry) throws InvalidProtocolBufferException {
        RowChange rowChange;
        rowChange = RowChange.parseFrom(entry.getStoreValue());
        List<RowData> rowDatasList = rowChange.getRowDatasList();

        for (RowData rowData : rowDatasList) {
            List<Column> columnList = rowData.getAfterColumnsList();
            StringBuffer sql = new StringBuffer("INSERT INTO " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
            for (int i = 0; i < columnList.size(); i++) {
                sql.append(columnList.get(i).getName());
                if (i != columnList.size() - 1) {
                    sql.append(",");
                }
            }
            sql.append(") VALUES (");
            for (int i = 0; i < columnList.size(); i++) {
                String sqlType = parseSqlType(columnList.get(i).getMysqlType());
                String parseSqlType = parseSqlType(sqlType);
                if(parseSqlType.equals("varchar")){
                    sql.append("'" + columnList.get(i).getValue() + "'");
                }else if(parseSqlType.equals("int")){
                    sql.append(columnList.get(i).getValue());
                }
                if (i != columnList.size() - 1) {
                    sql.append(",");
                }
            }
            sql.append(")");
            SQL_QUEUE.add(sql.toString());
        }
    }

    /**
     * 入库
     * @param sql
     */
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("update: "+ row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }

    public static void main(String[] args) throws InvalidProtocolBufferException {
        CanalSynClient canalClient = new CanalSynClient();
        canalClient.run();
    }
}

 

最后

以上就是高挑星星为你收集整理的Canal监控库表代码(新)的全部内容,希望文章能够帮你解决Canal监控库表代码(新)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部