概述
在关系型数据同步到ES中,发现第一次同步时需要全量的数据,之后则需要定时去同步增量数据
全量更新:
input {
jdbc {
#mysql 数据库链接,dataassets为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets"
#用户名和密码
jdbc_user => "root"
jdbc_password => "zhbr@2020"
#驱动
jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar"
#驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#是否分页
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#直接执行sql语句
statement => "SELECT * FROM datadictionary"
##设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
type => "jdbc"
}
beats {
#ip地址
host => "192.168.1.185"
port => 5400
}
}
#这里我将es自动生成的@timestamp和我mysql库里面的time时间字段时间+8个小时 mysql数据库时间存的是cst时间(东八区),es收入时默认时间是utc时间 相差8个小时
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
ruby {
code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('time',event.get('timestamps'))"
}
mutate {
remove_field => ["timestamps"]
}
}
output {
if [type] == "jdbc" {
elasticsearch {
#es库的地址
hosts => ["192.168.1.183:9200"]
#es库中的_index 可以随便配置
index => "dataassets"
#es库中的_type,可以随便配置
document_type => "dataassets_type"
#设置数据的id为数据库中的字段,主键
document_id => "%{id}"
}
}
}
增量更新:
根据时间戳:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets"
jdbc_user => "root"
jdbc_password => "zhbr@2020"
jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "SELECT * FROM datadictionary where time > :sql_last_value"
use_column_value => true
tracking_column => "time"
record_last_run => true
tracking_column_type => timestamp
last_run_metadata_path => "/opt/bigdata/logstash-6.6.2/config/station_parameter.txt"
schedule => "*/60 * * * * *"
type => "jdbc"
}
beats {
host => "192.168.1.183"
port => 5400
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
ruby {
code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('time',event.get('timestamps'))"
}
mutate {
remove_field => ["timestamps"]
}
}
output {
if [type] == "jdbc" {
elasticsearch {
hosts => ["192.168.1.183:9200"]
index => "abcd1"
document_type => "abcd_type"
document_id => "%{id}"
}
}
}
根据唯一主键id
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets"
jdbc_user => "root"
jdbc_password => "zhbr@2020"
jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "SELECT * FROM datadictionary where id> :sql_last_value"
use_column_value => true
tracking_column => "id"
record_last_run => true
tracking_column_type => numeric
last_run_metadata_path => "/opt/bigdata/logstash-6.6.2/config/station_parameter.txt"
schedule => "*/60 * * * * *"
type => "jdbc"
}
beats {
host => "192.168.1.183"
port => 5400
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
ruby {
code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('time',event.get('timestamps'))"
}
mutate {
remove_field => ["timestamps"]
}
}
output {
if [type] == "jdbc" {
elasticsearch {
hosts => ["192.168.1.183:9200"]
index => "abcd1"
document_type => "abcd_type"
document_id => "%{id}"
}
}
}
最后
以上就是悦耳柜子为你收集整理的logstash jdbc全量更新与增量更新的全部内容,希望文章能够帮你解决logstash jdbc全量更新与增量更新所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复