赞
踩
logstash可以将不同数据源,例如日志、文件、或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据。(实例:数据库DB1中的表A有添加或者修改,数据库DB2中的表B也会自动同步)
数据源输入使用logstash中自带的logstash-input-jdbc
,无需额外安装,官网使用说明地址。
数据源输出需要使用logstash-output-jdbc
,但是在loastash官网中output plugins列表中并没有相关插件,需要额外安装,使用说明在Github地址。
安装logstash
将logstash下载后,放到/opt/elastic/
目录下,并将logstash目录重命名为logstash-test
。
安装logstash-output-jdbc
,在/opt/elastic/logstash-test
目录下执行:
bin/logstash-plugin install logstash-output-jdbc
安装成功:
在数据库DB1中创建表A,并添加数据如下:
在数据库DB2中创建表B,表结构与A一致,暂不添加数据:
logstash配置文件中必须包含两个元素input
和output
,分别是数据来源的配置和数据输出的配置。还有一个可选项filter
,用来处理数据源和数据输出的之间的适配,例如,需要将某个字段的值10以后再输出,这个10的动作就应该写在filter
模块;还有数据源和数据输出字段的编码不同,日期类型不同等情况的处理。(由于本文中A表和B表中数据结构都是一样的,只是实现简单的数据同步,暂时用不到filter
)
同步配置文件如下:后面对每个部分进行解释。
input { jdbc { jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true" jdbc_user => "expert" jdbc_password => "123456" jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" statement => "SELECT title,description FROM A" } } filter {} output { jdbc { connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true" username => "root" password => "root123" driver_jar_path => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar" driver_class => "com.mysql.jdbc.Driver" statement => ["insert into B (title,description) values (?,?)","[title]","[description]"] } stdout { codec => json_lines } }
input
配置
input的配置中有很多强大的功能,详细使用见官网地址,此处只介绍上面涉及到的参数:
output
配置
由于logstash-output-jdbc
是额外扩展的output插件,在配置参数的写法上也略有不同,比如:所有的参数前面都没有jdbc_前缀
jdbc_driver_library
一致jdbc_driver_class
一致将上述的配置文件命令为logstash_default.conf
,放在logstash-test/conf
文件夹下,在logstash-test
下执行:
logstash启动命令:
./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
启动后发现报如下异常:
java.lang.IllegalAccessError: tried to access class com.mysql.jdbc.EscapeProcessor from class com.mysql.jdbc.ConnectionImpl
java.lang.IllegalAccessError: com/mysql/jdbc/EscapeProcessor
遗憾的是目前没有找到为什么会报这个异常,不过换了一种驱动的配置方式,这个异常就消失了。另外一种指定驱动jar包的方式也是官网给出的方式如下:
下面结合本例中给出解决办法。
启动异常处理方式:
在logstash目录下,创建目录vendor/jar/jdbc
(/opt/elastic/logstash-test/vendor/jar/jdbc),将驱动jar包放入该路径下。
将配置文件中的driver_jar_path
注释掉,
重新执行logstash启动命令
./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
执行结果:
检查DB2中的表B:数据已经全部同步过去了。
按照上面的过程,能实现执行logstash命令以后,DB1中的表A和DB2中的表B数据同步,但是如果后续表A中的数据有新增或者修改,还需要再去启动logstash。logstash提供了一种定时任务的方式,定期去检查表A中的数据是否有变化,根据表A的最后修改时间(LastUpdateDate)将表A中新增和修改的数据修改新增到表B中。
将配置文件做如下修改:
input { jdbc { jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true" jdbc_user => "expert" jdbc_password => "123456" jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" statement => "SELECT title,description FROM A" schedule => "* * * * *" record_last_run => true use_column_value => true tracking_column => "LastUpdateDate" tracking_column_type => "timestamp" last_run_metadata_path => "/opt/elastic/logstash-test/last_record/logstash_default_last_time" clean_run => false lowercase_column_names => false } } filter {} output { jdbc { connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true" username => "root" password => "root123" driver_class => "com.mysql.jdbc.Driver" statement => ["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"] } stdout { codec => json_lines } }
input
中新增的参数:
schedule => "* * * * *"
:每分钟检查一次lowercase_column_names => false
读取字段时是否区分大小写output
新增的参数:
stdout {
codec => json_lines
}
stdout
为可选字段,将输出数据的方式加一种,stdout
可以把input中statement 的select结果转为json字符串打印到logstash的log中,便于追踪检查哪些数据被更新了。
output中的statement
修改:
将表B中的title字段设置为唯一约束,将statement改为如下。即可实现如果title相同的时候,只修改记录,而不是新增。唯一约束可以根据实际需求去设置。
["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]
表A修改完只有需要1分钟以后才能在表B中看到同步效果,因为定时任务设置的每分钟执行一次。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。