我是靠谱客的博主 踏实舞蹈,最近开发中收集的这篇文章主要介绍elsticsearch+logstash +Logstash-input-jdbc使用mysql数据库导入数据,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

ES 6.3.0 

logstash  6.3. 0

contOS7   各版本号请对号入座,否则坑更多

安装过程是非常痛苦的,遇到很多坑,因为数据库里的一张表数据量大概在八百万多条数据,想直接导入到elasticsearch里,

 最开始想写程序,多搞几个线程去写入,感觉太LOW了,网上也有其它工具,首选 logstash-input-jdbc 

logstash5.X开始,已经至少集成了logstash-input-jdbc插件。所以,你如果使用的是logstash5.X,可以不必再安装,可以直接跳过这一步。

安装

Elasticsearch-jdbc工具包(废弃),虽然是官方推荐的,但是已经几年不更新了。所以选择安装logstash-input-jdbc,

由于,logstash-input-jdbc是基于 logstash 的,而logstash又是基于 ruby的,所以,要依次安装,命令如下:

1 安装Logstash 很简单,只需要把下载的ZIP包,解压就可以用

CentOS的软件安装工具不是apt-get 是yum

(遇坑) 请下载与ES对应的版本号 否则报错(注意)

看需要安装 yum install ruby 与否

cd /search sudo mkdir logstash cd logstash sudo wget http://download.elasticsearch.org/logstash/logstash/logstash-6.3.0.zip

官网地址: https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.zip

下载解压

unzip logstash-6.3.0.zip

 

进入解压后的logstash 目录下面,安装插件

logstash-6.3.0]# bin/logstash-plugin install logstash-input-jdbc

等待之后,如图显示,安装成功

 

 

 

 

linux系统启动,进入bin目录 ./logstash 再下一级目录

./logstash -e 'input { stdin { } } output { stdout {} }'

初始化出现如下输出 然后输入hello word,显示一下内容说明安装成功

 

 

两个文件路径 自己创建 mysql.conf 配置文件里的索引名称和类型是ES库里存在

 

# 1 .jdbc.sql 路径 为:/opt/logstash-6.3.0/my_logstash #内容是: select * from version   

# 2 .jdbc.conf 路径为: /opt/logstash-6.3.0/my_logstash #内容是:
 

input {

stdin {

}

jdbc {

# mysql 数据库链接,shop为数据库名

jdbc_connection_string => "jdbc:mysql://192.168.11.40:3306/tms_1.4.2"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "123456"

# 驱动

jdbc_driver_library => "/opt/logstash-6.3.0/my_logstash/mysql-connector-java-5.1.44-bin.jar"

# 驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

#处理中文乱码问题

codec => plain { charset => "UTF-8"}

# 执行的sql 文件路径+名称

statement_filepath => "/opt/logstash-6.3.0/my_logstash/show.sql"

# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

# 索引类型

type => "logstash_type"

}

}

output {

elasticsearch {

#ESIP地址与端口

hosts => ["192.168.11.237:9200"]

# protocol => "http"

#ES索引名称(自己定义的)

index => "logstsh_index"

#自增ID编号

document_id => "%{id}"

# cluster => "elasticsearch"

}

stdout {

#以JSON格式输出

codec => json_lines

}



} ######################

配置文件里, 有的人配置了过滤条件,但是我配置后就报错, 所以把这段删除了

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }

第一 前提:

1, 我有mysql数据库,我有一张logstash_test表,), 里面有两条数据。

2,已经启动 elasticsearch . 地址是 http://192.168.11.237 端口:9200.

3,已经安装 logstash, 地址在 /opt/logstash

第二 准备

两个文件: myjdbc.conf  

                   show.sql 。名字随便起啦。

  一个 mysql 的java 驱动包  : mysql-connector-java-5.1.36-bin.jar

myjdbc.conf 内容:

注意 statement_filepath => "show.sql" 这个名字要跟下面的sql文件的名字对应上。

(遇坑)驱动包是有 bin 的包, 不是数据层的JAR包,

执行mysql 导入 jdbc 

./logstash -f ../my_logstash/myjdbc.conf 安装路径

启动成功

最后成功遇到(坑)

hosts => ["192.168.11.237:9200"] 必须加中括号,

绝对路径名和包名写全

jdbc_driver_library => "/opt/logstash-6.3.0/my_logstash/mysql-connector-java-5.1.44-bin.jar"

(坑2) 索引类型 配置文件 type => "logstash_type" 类型不用先创建

之前一直同步不成功,数据过不去报异常,然后只建索引,搞定

WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch

 

遇到数据 导入重复的问题,

处理与MYSQL数据库同步数据重复问题 mysql.conf

  #处理中文乱码问题
      codec => plain { charset => "UTF-8"}
      #使用其它字段追踪,而不是用时间
      use_column_value => true
      #追踪的字段
      tracking_column => id
      record_last_run => true
     #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
     last_run_metadata_path => "/opt/logstash-6.3.0/my_logstash/station_parameter.txt"
     #开启分页查询
     jdbc_paging_enabled => true
     jdbc_page_size => 300
	  
      # 执行的sql 文件路径+名称
      statement_filepath => "/opt/logstash-6.3.0/my_logstash/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
	  schedule => "* * * * *"
      # 索引类型
	  type => "jdbc"

参数介绍:

//是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true

//是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true

//如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
tracking_column => MY_ID

//指定文件,来记录上次执行到的 tracking_column 字段的值
//比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.
//我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value即可. 其中 :sql_last_value取得就是该文件中的值(10000).
last_run_metadata_path => "/opt/logstash-6.3.0/my_logstash/station_parameter.txt"


station_parameter.txt  文件里, 我默认加了0 初始化

//是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false

//是否将 column 名称转小写
lowercase_column_names => false

//存放需要执行的 SQL 语句的文件位置
statement_filepath => "/opt/logstash-6.3.0/my_logstash/jdbc.sql"

所有同步的文件如下图, 之前看网上一位哥们写的,把sql_last_value 写错了,导致无法同步 改回来后,重启一切OK

同步完成后不能中断服务器,否则没数据,插入后自动同步到elasticsearh上去

下面就是自增ID的序号,标记行,必须是有序的

 

停掉服务后再次启动就是会取到最后一次读取的变量,这样就避免重复去插入之前调用过的数据,这步应该很重要,在监听表时

另外,可以配置多张表数据跟踪和监听 就是在myjdbc.conf 配置文件里 多配置mysql数据库,写上不同的索引类型就OK

output {
 if [type] == "gjhz"{
elasticsearch {
        hosts => "192.168.42.190:9200"
        index => "zkr"

document_id => "%{a_id}"
    }
 }else{
elasticsearch {
        hosts => "192.168.42.190:9200"
        index => "zkr"
document_id => "%{d_id}"
    }
 }

搞了几天时间, 真是身心疲惫,遇到问题资料都 不算多,后面还启动报日志错误没有解决,等有时间后一并解决掉,先整合下数据玩玩,

最后

以上就是踏实舞蹈为你收集整理的elsticsearch+logstash +Logstash-input-jdbc使用mysql数据库导入数据的全部内容,希望文章能够帮你解决elsticsearch+logstash +Logstash-input-jdbc使用mysql数据库导入数据所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部