我是靠谱客的博主 忧虑热狗,最近开发中收集的这篇文章主要介绍使用logstash全量同步MySQL到ElasticSearch,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

上一篇文章我们讨论了ELK框架日志系统的搭建,这次我们通过Logstash和ES实现MySQL数据全量增量同步。

MySQL数据同步与日志同步唯一不同的地方在于logstash配置文件的不同而已,下面我们开始介绍详细配置过程:

一、下载和配置ES和logstash,详细请参考上一篇文章

二、下载mysql-connector-java的jar包

大家可根据自己的mysql版本下载对应的包(Maven包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java)

三、创建logstash配置文件和sql文件

  1. 在Logstash的bin 目录下创建jdbc.conf配置文件,文件内容如下:
input {
   stdin {
   }
   jdbc {
   	   # mysql数据库链接
       jdbc_connection_string => "jdbc:mysql://192.168.1.143:3306/icbc-ccpm_0?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
       jdbc_user => "root"
       jdbc_password => "P@ssw0rd"
       jdbc_driver_library => "E:/logstash-6.4.0/logstash-6.4.0/lib/mysqldriver/mysql-connector-java-5.1.44-bin.jar"
       jdbc_driver_class => "com.mysql.jdbc.Driver"
       jdbc_paging_enabled => "true"
       jdbc_page_size => "50000"
       # 执行的sql
       statement_filepath => "E:/logstash-6.4.0/logstash-6.4.0/logs/mysql/jdbc.sql"
       # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
       schedule => "* * * * *"
       # 处理中文乱码问题
       codec => plain { charset => "UTF-8"}
       # 索引类型
       type => "order"
       lowercase_column_names => false
       # 是否记录最后一次运行内容
       record_last_run => true
       use_column_value => true
       # 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
       tracking_column => "CREATE_DATETIME"
       # 默认为number,如果为日期必须声明为timestamp
       tracking_column_type => "timestamp"
       # 设置记录的路径
       last_run_metadata_path => "E:/logstash-6.4.0/logstash-6.4.0/logs/mysql/order_last_time"
       # 每次运行是否清除,如果为true,则每次启动都从头开始同步
       clean_run => false
     }
}

filter {
   json {
       source => "message"
       remove_field => ["message"]
   }
}
output {
  if [type]=="order"{
       elasticsearch {
       	   # ES地址和端口
           hosts => ["192.168.9.26:9200"]
           # ES索引名称
           index => "order"
           # 文档类型
           document_type => "orderInfo"
   		   # 文档ID,不设置则由ES自动生成20位的编号
           #document_id => "%{id}"
       }
       stdout {
           codec => json_lines
       }
  }
}
  1. 创建sql文件
SELECT
	A.ORDER_INFO_ID,
    A.OUT_TRADE_NO,
    A.TRADE_NO,
    A.CREATE_DATETIME,
    A.ORDER_AMT,
    A.PAYMENT_AMT,
    A.TOTAL_AMT
FROM
    ORDER_INFO_3 A 
WHERE
		A.CREATE_DATETIME > :sql_last_value 
order by A.CREATE_DATETIME

通过命令行进入bin目录,运行logstash -f jdbc.conf,启动logstash服务,同步成功后,

多个表无非就是在input里面多加几个类型,在output中多加基础
类型判定。

input里的type和output if判定的type保持一致,该type对应ES中的type。

最后

以上就是忧虑热狗为你收集整理的使用logstash全量同步MySQL到ElasticSearch的全部内容,希望文章能够帮你解决使用logstash全量同步MySQL到ElasticSearch所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部