赞
踩
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。详细说明可以看官方文档。
下载地址:https://www.elastic.co/cn/logstash
功能:数据同步,把mysql里面的数据同步到elasticsearch中
logstash有两种同步方式一种是以id作为同步边界,一种是以update_time作为同步边界。logstash有一个定时任务,定时任务到期后就会进行同步。
这里采用的是update_time方式进行同步。
把下载好的包上传到服务器(logstash-6.4.3.tar.gz),与elasticsearch的版本一致,解压到/usr/local/目录下
由于要连接mysql所以需要用到mysql驱动包,用的版本是mysql-connector-java-5.1.41.jar
创建一个foodie-items索引
到时候需要把驱动包,Logstash同步配置文件,还有同步的数据库语句放到这个目录下。
cd /usr/local/logstash-6.4.3
mkdir sync
把驱动包放到sync目录下
vim foodie-items.sql
需要注意的是表中的updated_time时间要比当前的时间还要大,才可以同步过去
SELECT
i.id AS itemId,
i.item_name AS itemName,
i.sell_counts AS sellCounts,
ii.url AS imgUrl,
tempSpec.price_discount AS price,
i.updated_time AS updated_time
FROM
items i
LEFT JOIN items_img ii ON i.id = ii.item_id
LEFT JOIN ( SELECT item_id, MIN( price_discount ) AS price_discount FROM items_spec GROUP BY item_id ) tempSpec ON i.id = tempSpec.item_id
WHERE
ii.is_main = 1
AND
i.updated_time >= :sql_last_value
sql_last_value是Logstash每次在同步完以后的一个边界值,logstash会自动加上
一个输入,一个输出
input { jdbc { # 设置 MySql/MariaDB 数据库url以及数据库名称 jdbc_connection_string => "jdbc:mysql://192.168.1.14:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" # 用户名和密码 jdbc_user => "root" jdbc_password => "671354" # 数据库驱动所在位置,可以是绝对路径或者相对路径 jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.41.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 开启分页 jdbc_paging_enabled => "true" # 分页每页数量,可以自定义 jdbc_page_size => "1000" # 执行的sql文件路径 statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务 schedule => "* * * * *" # 索引类型 type => "_doc" # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true # 记录上一次追踪的结果值(不需要手动创建) last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time" # 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间 tracking_column => "updated_time" # tracking_column 对应字段的类型 tracking_column_type => "timestamp" # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录 clean_run => false # 数据库字段名称大写转小写 lowercase_column_names => false } } output { elasticsearch { # es地址 hosts => ["192.168.233.132:9200"] # 同步的索引名 index => "foodie-items" # 设置_docID和数据相同 # document_id => "%{id}" document_id => "%{itemId}" } # 日志输出 stdout { codec => json_lines } }
./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
第一次没有track_time这个文件,所以系统会有一个默认的时间。
后面就会生成一个track_time的文件,里面记录着这次同步的时间。
@timestamp是logstash自己记录的时间,后面同步的话只有数据库的update_time时间大于@timestamp才会同步过来。
@timestamp表示的是本次同步的时间,如果下次同步数据的时候就会把update和该列的时间进行比较不一样就会进行同步
并且会在sync目录下记录本次同步的时间,下次只有update_time大于该时间的才会进行同步
效果
logstash默认的模版是没有配置中文分词的,可以自己定义一个。
查看默认模版
http://192.168.233.132:9200/_template/logstash/
myik为自定义模版的名称
http://192.168.233.132:9200/_template/myik/
{ "order": 0, "version": 1, "index_patterns": [ "*" ], "settings": { "index": { "refresh_interval": "5s" } }, "mappings": { "_default_": { "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "analyzer": "ik_max_word", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "location": { "type": "geo_point" }, "latitude": { "type": "half_float" }, "longitude": { "type": "half_float" } } } } } }, "aliases": {} }
把上面自定义的模版进行提交,同步数据就会使用这个模版了
创建一个foodie-items-ik索引,并修改logstash-db-sync.conf配置文件,输出的索引需要改下index => “foodie-items-ik”
重新进行同步
./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
只要是text类型的都使用上了中文分词器
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。