赞
踩
数据放在mysql上不好进行分析,且查询的还比较慢。就想着把数据同步到es上,利用es的高效查询功能进行数据分析。
CREATE TABLE `t_ex_deal` ( `deal_id` varchar(50) NOT NULL, `back` decimal(36,18) DEFAULT NULL, `created_date` datetime DEFAULT NULL, `fee` decimal(36,18) DEFAULT NULL, `fee_rate` double DEFAULT NULL, `fee_coin` varchar(50) DEFAULT NULL, `gain_coin` varchar(50) DEFAULT NULL, `gain_volume` decimal(36,18) DEFAULT NULL, `member_id` int(11) DEFAULT NULL, `order_id` varchar(30) DEFAULT NULL, `order_price` decimal(36,18) DEFAULT NULL, `pay_coin` varchar(50) DEFAULT NULL, `pay_volume` decimal(36,18) DEFAULT NULL, `price` decimal(36,18) DEFAULT NULL, `side` varchar(20) DEFAULT NULL, `symbol` varchar(50) DEFAULT NULL, `trade_id` varchar(50) DEFAULT NULL, `transaction_id` varchar(50) DEFAULT NULL, `updated_date` datetime DEFAULT NULL, `volume` decimal(36,18) DEFAULT NULL, `is_archived` bit(1) DEFAULT NULL, PRIMARY KEY (`deal_id`), UNIQUE KEY `idx_unique` (`trade_id`,`side`), UNIQUE KEY `idx_transaction` (`transaction_id`), KEY `idx_symbol` (`symbol`), KEY `idx_member` (`member_id`), KEY `idx_order` (`order_id`), KEY `idx_archived` (`is_archived`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
下载地址为( 下载后解压可以得到connector的jar包 ) :
https://dev.mysql.com/downloads/connector/j/
# 字段中的 deal_id 是雪花ID,前面几位代表的是时间 # document_id 采用 deal_id 防止数据重复插入 # 会根据轮询时间以及分页大小轮询,轮询一遍之后又从新开始,在数据量本身存量大的情况下,后面的更新不及时。 # mysql数据库时间存的是cst时间(东八区),es收入时默认时间是utc时间,所以filter中 -8 小时 # vim logstash.conf input { jdbc { jdbc_connection_string => "jdbc:mysql://10.2.2.128:3306/exchange" jdbc_user => "root" jdbc_password => "Bituan@2018" jdbc_driver_library => "/opt/logstash-6.6.2/plugin-self/mysql-connector-java-8.0.16.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # statement_filepath => "filename.sql" statement => "SELECT * FROM t_ex_deal" schedule => "* * * * *" type => "ex_deal" } beats { host => "10.2.2.129" port => 5400 } } filter { if [type] == "ex_deal" { grok { match => ["deal_id", "^%{YEAR:dealyear}%{MONTHNUM:dealmonth}%{MONTHDAY:dealday}"] } ruby { code => "event.set('created_date', event.get('updated_date').time.utc-8*60*60)" } ruby { code => "event.set('updated_date', event.get('updated_date').time.utc-8*60*60)" } mutate { remove_field => ["dealyear","dealmonth","dealday"] } } } output { if [type] == "ex_deal" { elasticsearch { hosts => ["10.2.2.129:9200"] #manage_template => true document_id => "%{deal_id}" index => "logstash-ex-deal-%{dealyear}-%{dealmonth}-%{dealday}" } } }
# # vim logstash.conf input { jdbc { jdbc_connection_string => "jdbc:mysql://10.2.2.128:3306/exchange" jdbc_user => "root" jdbc_password => "Bituan@2018" jdbc_driver_library => "/opt/logstash-6.6.2/plugin-self/mysql-connector-java-8.0.16.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 使用时间戳来完成增量更新 use_column_value => false tracking_column_type => timestamp tracking_column => "created_date" # 使用自增id字段来完成增量更新 use_column_value => true tracking_column => member_id record_last_run => true last_run_metadata_path => "./config/station_parameter.txt" clean_run => false statement => "SELECT * FROM t_account where member_id > :sql_last_value" schedule => "* * * * *" type => "ex_deal" }
上面新增参数详解
# 是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值。 use_column_value => true # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID. use_column_value => true # 追踪的字段值 tracking_column => member_id # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => true last_run_metadata_path => "./config/station_parameter.txt" # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。