当前位置:   article > 正文

logstash实现mysql数据库表实时同步_logstash 同步mysql

logstash 同步mysql

logstash可以将不同数据源,例如日志、文件、或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据。(实例:数据库DB1中的表A有添加或者修改,数据库DB2中的表B也会自动同步)

一、准备:

数据源输入使用logstash中自带的logstash-input-jdbc,无需额外安装,官网使用说明地址
数据源输出需要使用logstash-output-jdbc,但是在loastash官网中output plugins列表中并没有相关插件,需要额外安装,使用说明在Github地址

安装logstash

将logstash下载后,放到/opt/elastic/目录下,并将logstash目录重命名为logstash-test

安装logstash-output-jdbc,在/opt/elastic/logstash-test目录下执行:

	bin/logstash-plugin install logstash-output-jdbc
  • 1

安装成功:在这里插入图片描述

二、数据库表

在数据库DB1中创建表A,并添加数据如下:
在这里插入图片描述
在数据库DB2中创建表B,表结构与A一致,暂不添加数据:
在这里插入图片描述

三、logstash配置文件

logstash配置文件中必须包含两个元素inputoutput,分别是数据来源的配置和数据输出的配置。还有一个可选项filter,用来处理数据源和数据输出的之间的适配,例如,需要将某个字段的值10以后再输出,这个10的动作就应该写在filter模块;还有数据源和数据输出字段的编码不同,日期类型不同等情况的处理。(由于本文中A表和B表中数据结构都是一样的,只是实现简单的数据同步,暂时用不到filter

同步配置文件如下:后面对每个部分进行解释。

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"
        jdbc_user => "expert"
        jdbc_password => "123456"
        jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "SELECT title,description FROM A"
   }
}

filter {}

output {
    jdbc {
        connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"
        username => "root"
        password => "root123"
        driver_jar_path => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"
        driver_class => "com.mysql.jdbc.Driver"
        statement => ["insert into B (title,description) values (?,?)","[title]","[description]"]
    }
    stdout {
        codec => json_lines
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

input配置

input的配置中有很多强大的功能,详细使用见官网地址,此处只介绍上面涉及到的参数:

  • jdbc_connection_string :数据库连接配置
  • jdbc_user :用户名(连接DB1的用户名)
  • jdbc_password:密码(连接DB1的密码)
  • jdbc_driver_library:数据驱动的jar位置
  • jdbc_driver_class :数据库驱动类名(类似jdbc中的Class.forName(“com.mysql.jdbc.Driver”))
  • statement :查询数据源的sql语句(output中就是将此处的查询结果insert到数据库DB2中)

output配置

由于logstash-output-jdbc是额外扩展的output插件,在配置参数的写法上也略有不同,比如:所有的参数前面都没有jdbc_前缀

  • connection_string :数据库连接配置
  • username :用户名(连接DB2的用户名
  • password :密码(连接DB2的密码)
  • driver_jar_path :与input参数中的jdbc_driver_library一致
  • driver_class :与input参数中的jdbc_driver_class一致
  • statement :向DB2中添加数据的insert语句。

四、执行启动命令

将上述的配置文件命令为logstash_default.conf,放在logstash-test/conf文件夹下,在logstash-test下执行:

logstash启动命令

./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
  • 1

启动后发现报如下异常:

java.lang.IllegalAccessError: tried to access class com.mysql.jdbc.EscapeProcessor from class com.mysql.jdbc.ConnectionImpl
在这里插入图片描述
java.lang.IllegalAccessError: com/mysql/jdbc/EscapeProcessor在这里插入图片描述

遗憾的是目前没有找到为什么会报这个异常,不过换了一种驱动的配置方式,这个异常就消失了。另外一种指定驱动jar包的方式也是官网给出的方式如下:
在这里插入图片描述
下面结合本例中给出解决办法。

启动异常处理方式:

在logstash目录下,创建目录vendor/jar/jdbc(/opt/elastic/logstash-test/vendor/jar/jdbc),将驱动jar包放入该路径下。
在这里插入图片描述
将配置文件中的driver_jar_path注释掉,
在这里插入图片描述

重新执行logstash启动命令

./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
  • 1

执行结果:
在这里插入图片描述
检查DB2中的表B:数据已经全部同步过去了。
在这里插入图片描述

五、定时自动同步数据

按照上面的过程,能实现执行logstash命令以后,DB1中的表A和DB2中的表B数据同步,但是如果后续表A中的数据有新增或者修改,还需要再去启动logstash。logstash提供了一种定时任务的方式,定期去检查表A中的数据是否有变化,根据表A的最后修改时间(LastUpdateDate)将表A中新增和修改的数据修改新增到表B中。

将配置文件做如下修改:

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"
        jdbc_user => "expert"
        jdbc_password => "123456"
        jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "SELECT title,description FROM A"
        schedule => "* * * * *"
        record_last_run => true
        use_column_value => true
        tracking_column => "LastUpdateDate"
        tracking_column_type => "timestamp"
        last_run_metadata_path => "/opt/elastic/logstash-test/last_record/logstash_default_last_time"
        clean_run => false
        lowercase_column_names => false
   }
}

filter {}

output {
    jdbc {
        connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"
        username => "root"
        password => "root123"
        driver_class => "com.mysql.jdbc.Driver"
        statement => ["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]
    }
    stdout {
        codec => json_lines
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

input中新增的参数:

  • schedule => "* * * * *":每分钟检查一次
  • ` record_last_run => true``:是否记录上一下执行的时间
  • ` use_column_value => true``:是否使用数据源的字段
  • `tracking_column => “LastUpdateDate”``:数据源的最后修改时间字段
  • `tracking_column_type => “timestamp”``:字段类型
  • ` last_run_metadata_path => “/opt/elastic/logstash-test/last_record/logstash_default_last_time”``:存放最后修改时间的文件位置
  • ` clean_run => false``:这个参数表示你在开启Logstash同步数据时需不需要clean掉上次的记录
  • lowercase_column_names => false读取字段时是否区分大小写

output新增的参数:

stdout {
    codec => json_lines
}
  • 1
  • 2
  • 3

stdout为可选字段,将输出数据的方式加一种,stdout可以把input中statement 的select结果转为json字符串打印到logstash的log中,便于追踪检查哪些数据被更新了。

六、补充

output中的statement修改:

将表B中的title字段设置为唯一约束,将statement改为如下。即可实现如果title相同的时候,只修改记录,而不是新增。唯一约束可以根据实际需求去设置。

["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]
  • 1

表A修改完只有需要1分钟以后才能在表B中看到同步效果,因为定时任务设置的每分钟执行一次。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/312678
推荐阅读
相关标签
  

闽ICP备14008679号