概述
项目中用到elasticsearch,初始化数据时时写的程序从数据库里面查询出来,然后多线程往elasticsearch里面写入的。
今天试了一下Logstash-input-jdbc插件,发现高效又方便,而且可以设置定时任务。
1、安装插件
在logstash的bin目录下执行命令: logstash-plugin install logstash-input-jdbc
在H:softwarelogstash-6.1.2logstash-6.1.2lib 加入mysql的驱动 mysql-connector-java-5.1.38.jar
mysql.conf的内容如下:
input {
stdin {
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://39.107.60.74:3306/db_plat3"
# 用户名密码
jdbc_user => "root"
jdbc_password => "letsgo123QWE"
# jar包的位置
jdbc_driver_library => "H:softwarelogstash-6.1.2logstash-6.1.2libmysql-connector-java-5.1.38.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#statement_filepath => "config-mysql/test02.sql"
statement => "select * from information"
schedule => "* * * * *"
#索引的类型
type => "information"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "192.168.1.70:9200"
# index名
index => "information"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
3、启动logstash
然后通过以下命令启动logstash
.logstash.bat -f .config-mysqlmysql.conf过一会他就会自动的往ES里添加数据。
4、增量索引
但是现在有一个问题是:往elasticsearch里面写入是全量的,需要改成增量。
修改mysql.conf的内容如下
input {
stdin {
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://39.107.60.74:3306/db_plat3"
# 用户名密码
jdbc_user => "root"
jdbc_password => "letsgo123QWE"
# jar包的位置
jdbc_driver_library => "H:softwarelogstash-6.1.2logstash-6.1.2libmysql-connector-java-5.1.38.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
#使用其它字段追踪,而不是用时间
use_column_value => true
#追踪的字段
tracking_column => id
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path=>"H:softwarelogstash-6.1.2logstash-6.1.2binconfig-mysqlstation_parameter.txt"
#开启分页查询
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "config-mysql/information.sql"
#statement => "select * from information where id > :sql_last_value "
schedule => "* * * * *"
#索引的类型
type => "information"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "192.168.1.70:9200"
# index名
index => "information"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
修改的内容见红色。
station_parameter.txt中的内容如下:
logstash执行时打印出来的sql如下:
其中遇到的一个问题是用:
statement => "select * from information where id > :sql_last_value "
时会报:sql_last_value 的错 ,暂时不知道怎么解决。于是改用文件config-mysql/information.sql存在sql语句。
ps :
上述问题是因为从人家那里copy过来的时候是错的 把 :last_sql_value 改成 :sql_last_value 就可以了。
5、用时间来实现增量:
默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。
参考文章:https://www.cnblogs.com/a-du/p/7611620.html
最后
以上就是复杂哈密瓜为你收集整理的Elasticsearch使用Logstash-input-jdbc同步mysql数据(全量和增量)(windows)的全部内容,希望文章能够帮你解决Elasticsearch使用Logstash-input-jdbc同步mysql数据(全量和增量)(windows)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复