赞
踩
上两篇文章介绍过EL的安装和使用,接下来,我们面临的问题是怎么把数据同步到es里,比如,商城的商品数据,商品需要搜索,所以在添加商品的同时,除了往数据库添加一份,同时还要把这些数据同步到es里才行。
解决方案,目前比较流行的可分为以下几种:
1. 程序同步
直接在代码里写逻辑,数据在增删改查进数据库的同时,也往es里同步一份。
优点:方便,无需集成其他的技术;
缺点:代码耦合性太高,增加接口的处理时间;
2. logstash
定时查询数据库,查询到数据有变化就发送到es中。
优点:解耦,官方推荐。
缺点:
1、不支持同步删除,当然可以在数据库用逻辑删除代替物理删除,对于logstash来说就是更新操作了;
2、大数据量有性能问题,在对数据库的压力上,logstash的原理是定时扫描变动的表,所以对数据库有一定压力,并且如果有其他程序在进行某条语句更新,锁住了这条行数据,那logstash读取数据时,就会被“卡住”,如果这个时间过长,可能会影响服务器卡死。
3、无法做到实时同,只能秒级同步。
如果实时性要求不高,并且定时时间内数据变化量不大,推荐使用这个,学习维护成本比较低,毕竟是官方推荐,ELK全家桶。
3. canal
利用数据库的binlog同步变化数据,然后将数据发送给es,当然也可以通过java代码监听拿到数据,再发送到es或做其他处理。
优点:解耦,实时同步,没有大数据性能压力。
缺点:学习和维护成本高,要求对数据库有创建用户的权限,毕竟要用到数据库同步功能,开启binlog数据库的压力也会增加。
4. MQ中间件
有数据变化的时候,就通知mq,然后监听mq实现数据同步到mq。
优点:解耦,适合高并发。
缺点:如何保证消息可靠性,需要在业务代码中加入发送消息到MQ的代码。
演示同步mysql数据到es。
官网下载
下载之后解压即可。
本文安装的版本是logstash-7.12.1,准备测试的数据库表结构
mkdir mysqlnote
创建核心配置文件,名字自定义,主要用来配置mysql和logstash的数据映射和数据转换等等信息。
touch logstash-sync.conf
配置详情
input {
jdbc {
# 设置 MySql/MariaDB 数据库url以及数据库名称
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&allowMultiQuerie=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 数据库驱动mysql-connector-java-8.0.19.jar所在位置,可以是绝对路径或者相对路径
jdbc_driver_library => "/usr/local/logstash/logstash-7.12.1/mysqlnote/mysql-connector-java-8.0.27.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 是否开启分页,ture为开启
jdbc_paging_enabled => false
# 分页每页数量
jdbc_page_size => "50"
# 设置时区
jdbc_default_timezone =>"Asia/Shanghai"
# 执行的sql文件路径
statement_filepath => "/usr/local/logstash/logstash-7.12.1/mysqlnote/mysql.sql"
#使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内
#statement =>""
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
schedule => "*/5 * * * * *"
#是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestamp
use_column_value => true
#记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :last_sql_value。其中 :sql_last_value 取得就是该文件中的值,这个last_time会以文件形式存在
last_run_metadata_path => "/usr/local/logstash/logstash-7.12.1/mysqlnote/last_time"
#如果use_column_value为真,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。
tracking_column => "updateTime"
# tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric。
tracking_column_type => "timestamp"
#如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
clean_run => false
# 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的
lowercase_column_names => false
}
}
output {
elasticsearch {
# es地址 集群数组hosts => ["127.0.0.1:9200","127.0.0.1:9201"]
hosts => ["127.0.0.1:9200"]
# 同步的索引名必须要有@timestamp 不然yyyyMM不起效
index => "user"
# 设置_docID和数据相同
document_id => "%{id}"
#自定的模板名称
#template_name => "ps_seal_log"
#自定义的模板配置文件
#template => "/usr/local/logstash/logstash-7.12.1/mysqlnote/ps_test_log_template.json"
#是否重写模板
#template_overwrite => true
}
# 日志输出形式设置
stdout {
codec => json_lines
#codec => rubydebug
}
}
注意:output 中的映射模板注释掉了,原因是找了一圈不知道怎么写这个文件,网上的案例也是众说纷纭,所以,最好是直接用kibana直接先定义好,例如下边:
PUT /user
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"userName": {
"type": "text",
"analyzer": "ik_max_word"
},
"age": {
"type": "integer",
"index" : false
},
"sex": {
"type": "integer"
},
"address": {
"type": "text"
},
"price": {
"type": "double",
"index" : false
},
"createTime": {
"type": "date",
"index" : false
},
"updateTime": {
"type": "date",
"index" : false
},
"status": {
"type": "keyword"
}
}
}
}
这个配置满足一般的需求了,如果有更多的需求,此配置无法满足,可以使用logstash的过滤配置,input和output中间还有一个filter配置,可以用来过滤和转换数据,有需求的可以自行研究,这里因为暂时没需要,就没有去研究了。
接下来,准备上边配置所需要的东西,本次配置需要下边的文件和jar包。
创建时间记录文件,文件内容会在logstash启动连上数据库之后自动创建。
touch last_time
创建sql文件,查询出你要映射到es中的数据,我这里演示最简单的查询全部
vim mysql.sql
文件内容
select id,user_name as userName,age,sex,address,price,create_time as createTime,update_time as updateTime,status
from user
where update_time > :sql_last_value;
这个sql_last_value就是上边last_time文件中记录的值。
下载数据库驱动jar包。
官网地址
注意,5版本和8版本设置驱动类名(jdbc_driver_class)是有区别的:
5版本:com.mysql.jdbc.Driver。
8版本:com.mysql.cj.jdbc.Driver。
进入logstash文件夹
./bin/logstash -f ./mysqlnote/logstash-sync.conf
往数据库里添加数据,每5秒之后logstash就会查询数据库,增加添加数据到es中。
由此可见,因为同步数据是依赖查询mysql,所以logstash不支持同步删除操作,当然,就像我开头说的,可以设置逻辑删除字段status,以更新代替删除。
如果实在不想用逻辑删除,可以用下边两种方式。
canal分为三个组件构成,deployer服务端,adapter客户端、admin web管理端,有集群高可用的一套解决方案,官方文档也比较详细,大项目推荐使用。
MQ有很多种,此处采用kafka来实现。
利用springboot集成kafka,随后接口收到消息,直接将增删改的消息扔进kafka,在消费端处理信息,之后同步到ES即可,缺点就是需要维护一套kafka以及消息处理的逻辑代码。
kafka的安装和整合springboot,参考下方的文章即可:
Linux安装kafka
springboot集成整合kafka
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。