赞
踩
Logstash 是 Elastic 技术栈中的一个技术,它是一个数据采集引擎,可以从数据库采集数据到 ES 中。可以通过设置 自增 ID 主键 或 更新时间 来控制数据的自动同步:
官网 安装 Logstash 与安装分词器一样,都要与 ES 版本号保持一致,解压:
tar -zxvf logstash-6.4.3.tar.gz
sync
目录用于存放数据同步文件mysql-connector-java-8.0.11.jar
foodie-items.sql
用于获取数据库数据,内容如下,其中要注意的是 :sql_last_value
,它是一个变量,表示上一次同步时记录下的时间。SELECT i.id as itemId, i.item_name as itemName, i.sell_counts as sellCounts, i.updated_time as updated_time, ii.url as imgUrl, tempSpec.price_discount as price from items i left JOIN items_img ii on i.id = ii.item_id left JOIN (select item_id, MIN(price_discount) as price_discount from items_spec GROUP BY item_id) tempSpec on i.id = tempSpec.item_id where ii.is_main = 1 and i.updated_time >= :sql_last_value
Get /_template/logstash
,在 sync
目录下创建 logstash-ik.json
文件,将 String
类型的分词器设置为 ik_max_word
:{ "order": 0, "version": 1, "index_patterns": ["*"], "settings": { "index": { "refresh_interval": "5s" } }, "mappings": { "_default_": { "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "analyzer": "ik_max_word", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "location": { "type": "geo_point" }, "latitude": { "type": "half_float" }, "longitude": { "type": "half_float" } } } } } }, "aliases": {} }
不过这样子去修改模板很容易不生效,起码我这么做是不生效的,后来我参考了这边文章 《ES创建自定义模板,实现中文分词 》 的做法,使用 API 去设置模板
myik
,即使用PUT /_template/myik
请求体是上面的内容,然后使用GUT /_template/myik
可以查看到设置的模板信息。
logstash-db-sync.conf
,其内容如下:input { jdbc { # 设置 MySql/MariaDB 数据库url以及数据库名称 jdbc_connection_string => "jdbc:mysql://localhost:3306/foodie?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" # 用户名和密码 jdbc_user => "root" jdbc_password => "123456" # 数据库驱动所在位置,可以是绝对路径或者相对路径 jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-8.0.11.jar" # 驱动类名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 开启分页 jdbc_paging_enabled => "true" # 分页每页数量,可以自定义 jdbc_page_size => "1000" # 执行的sql文件路径 statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql" # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务 schedule => "* * * * *" # 索引类型 type => "_doc" # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true # 记录上一次追踪的结果值 last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time" # 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间 tracking_column => "updated_time" # tracking_column 对应字段的类型 tracking_column_type => "timestamp" # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录 clean_run => false # 数据库字段名称大写转小写 lowercase_column_names => false } } output { elasticsearch { # es地址,如果是集群,则需要配置多个地址 hosts => ["127.0.0.1:9200"] # 同步的索引名 index => "foodie-items" # 设置_docID和数据相同 document_id => "%{itemId}" # 定义模板名称 template_name => "myik" # 模板所在位置,如果使用 API 的方式生成模板,可以注释掉 template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json" # 重写模板,如果使用 API 的方式生成模板,可以注释掉 template_overwrite => true # Logstash 自动管理模板功能,默认为 true,如果使用 API 的方式生成模板,可以注释掉 manage_template => false } # 日志输出 stdout { codec => json_lines } }
_Mapping
)bin
目录下启动 Logstash./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。