概述
使用版本
logstash 5.5.3
es 5.5.3
kibana 5.5.3
更新时间 2019/1/5
elasticsearch 的出现使得我们的存储、检索数据更快捷、方便。但很多情况下,我们的需求是:现在的数据存储在mysql、oracle等关系型传统数据库中,如何尽量不改变原有数据库表结构,将这些数据的insert,update,delete操作结果实时同步到elasticsearch(简称ES)呢?
delete
我的业务中没有物理删除 ,所以只需要添加一个标记字段,实现更新操作就可以了,比如 status0/1 isdel 0/1
insert&&update
使用一个last_update_date 字段作为更新时间,在创建时也要插入默认为 sysdate / now() 当前时间,通过判断上次同步时间与更新时间字段作对比如果变大就进行更新或者插入操作
进入正题
logstash-input-jdbc安装
获取 logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.3.zip
unzip logstash-5.5.3.zip && cd logstash-5.5.3
安装 jdbc 和 elasticsearch 插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
获取 jdbc mysql 驱动(项目中使用了5.1.14 但未找到5.1.14 , 5.1.46也可以正常使用)
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
需要两个文件:1)mysql.conf; 2)job.sql 个人创建在config文件夹
mysql.conf
input {
stdin {
}
jdbc {
# mysql jdbc connection string to our backup databse 后面的test对应mysql中的test数据库
jdbc_connection_string => "jdbc:mysql://192.168.71.1:3306/test"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "******"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/usr/local/elk/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
#每次同步50000条
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/elk/logstash/config/job.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)设置1分钟更新
schedule => "*/1 * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
#ESIP地址与端口
hosts => "192.168.1.1:9200"
#ES索引名称(自己定义的)
index => "product"
#自增ID编号填充到_id 可注释如果注释自动es生成_id
document_id => "%{id}"
#索引类型
document_type => job
}
stdout {
#以JSON格式输出
codec => json_lines
}
}
job.sql
#要执行的sql 可以是多表联合查询的结果
select
*
from job
where last_update_date >= :sql_last_value (此为增量同步,如果要全量同步请屏蔽此行)
保存 后执行 logstash ./logstash -f ./config/mysql.conf &
控制台会自动更新打印出更新日志
同步速度800w 条 大概80000/s
2019/1/11 追加
由于业务需求,发现两个问题:
1.logstash-input-jdbc 只支持最快一分钟进行一次同步,不支持秒级同步;
2.logstash-input-jdbc 增量同步必须设置主键或唯一键,否则会重复同步数据,造成数据重复问题;
3.由于公司想将kibana 开发给BI使用,发现易用性很差,考虑方案:由es提供查询的默认查询方法,前端页面重新开发,来适应业务需求;
业务解决方案,使用阿里canal 开源工具+kafka实现实时同步功能,
参考方案:https://www.cnblogs.com/chanshuyi/p/6669006.html
canal 项目地址:https://github.com/alibaba/canal
通过升级ELK版本到6.3+和新版本的kibana页面直接使用sql 语句来增加kibana易用性。
最后
以上就是帅气钥匙为你收集整理的logstash-input-jdbc实现mysql 与elasticsearch实时同步的全部内容,希望文章能够帮你解决logstash-input-jdbc实现mysql 与elasticsearch实时同步所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复