在关系型数据同步到ES中,发现第一次同步时需要全量的数据,之后则需要定时去同步增量数据
全量更新:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70input { jdbc { #mysql 数据库链接,dataassets为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets" #用户名和密码 jdbc_user => "root" jdbc_password => "zhbr@2020" #驱动 jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar" #驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #是否分页 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #直接执行sql语句 statement => "SELECT * FROM datadictionary" ##设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" type => "jdbc" } beats { #ip地址 host => "192.168.1.185" port => 5400 } } #这里我将es自动生成的@timestamp和我mysql库里面的time时间字段时间+8个小时 mysql数据库时间存的是cst时间(东八区),es收入时默认时间是utc时间 相差8个小时 filter { ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } ruby { code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)" } ruby { code => "event.set('time',event.get('timestamps'))" } mutate { remove_field => ["timestamps"] } } output { if [type] == "jdbc" { elasticsearch { #es库的地址 hosts => ["192.168.1.183:9200"] #es库中的_index 可以随便配置 index => "dataassets" #es库中的_type,可以随便配置 document_type => "dataassets_type" #设置数据的id为数据库中的字段,主键 document_id => "%{id}" } } }
增量更新:
根据时间戳:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets" jdbc_user => "root" jdbc_password => "zhbr@2020" jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "SELECT * FROM datadictionary where time > :sql_last_value" use_column_value => true tracking_column => "time" record_last_run => true tracking_column_type => timestamp last_run_metadata_path => "/opt/bigdata/logstash-6.6.2/config/station_parameter.txt" schedule => "*/60 * * * * *" type => "jdbc" } beats { host => "192.168.1.183" port => 5400 } } filter { ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } ruby { code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)" } ruby { code => "event.set('time',event.get('timestamps'))" } mutate { remove_field => ["timestamps"] } } output { if [type] == "jdbc" { elasticsearch { hosts => ["192.168.1.183:9200"] index => "abcd1" document_type => "abcd_type" document_id => "%{id}" } } }
根据唯一主键id
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.185:3306/dataassets" jdbc_user => "root" jdbc_password => "zhbr@2020" jdbc_driver_library => "/opt/bigdata/logstash-6.6.2/sql/mysql-connector-java-5.1.32-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "SELECT * FROM datadictionary where id> :sql_last_value" use_column_value => true tracking_column => "id" record_last_run => true tracking_column_type => numeric last_run_metadata_path => "/opt/bigdata/logstash-6.6.2/config/station_parameter.txt" schedule => "*/60 * * * * *" type => "jdbc" } beats { host => "192.168.1.183" port => 5400 } } filter { ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } ruby { code => "event.set('timestamps', event.get('time').time.localtime + 8*60*60)" } ruby { code => "event.set('time',event.get('timestamps'))" } mutate { remove_field => ["timestamps"] } } output { if [type] == "jdbc" { elasticsearch { hosts => ["192.168.1.183:9200"] index => "abcd1" document_type => "abcd_type" document_id => "%{id}" } } }
最后
以上就是悦耳柜子最近收集整理的关于logstash jdbc全量更新与增量更新的全部内容,更多相关logstash内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复