赞
踩
1.分库分表后多关联或者多条件查找效率低下,例如2b场景的查询,导出等需要多条件查询,继续用分库分表话效率低下。
2.数据量太多需要转移非关系型数据库elasticsearch存储
3.其他数据转移场景等
这两种场景都涉及到mysql数据同步到es数据解决方案,解决起来分总体两步走,一是存量数据的同步,二是增量数据的同步。这里利用的是canal的方案去同步数据,方案如下图所示
这个是不停机的方案,首先同时开启存量的数据的导入和增量数据的监听,待存量数据导入完成,开启java服务消费mq消息,对数据进行更新或者插入,若数据存在则进行更新,若数据不存在,是新插入则插入,是更新则保存到定时任务重试。这里只是理想方案,实际过程中和存量数据的大小,数据的增长率等有关系,具体实施肯定较为复杂。
若要执行停机方案,则比较简单,数据不再更新后,将存量数据插入到es后,再开启增量数据监听服务以及消费服务,这样es就能实时同步数据了,下面实践下canal adapter的mysql存量数据导入elasticsearch中。
实践版本:
elasticsearch &kibana:7.12.1
canal.client-adapter:1.1.7-SNAPSHOT
mysql :8.0 主从
以这个分表的数据为例
CREATE TABLE `pay_parent_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_id` int NOT NULL,
`status` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL,
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
`tenant_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '租户id',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1571152425171070978 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
在kibana中建立索引
PUT /pay_parent_0 { "mappings":{ "properties":{ "id": { "type": "long" }, "user_id": { "type": "long" }, "status": { "type": "text" }, "creator": { "type": "text" }, "create_time": { "type": "date", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone" : "Asia/Shanghai" }, "updater": { "type": "text" }, "update_time": { "type": "date", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone" : "Asia/Shanghai" }, "deleted": { "type": "long" }, "tenant_id": { "type": "text" } } } }
client-adapter 配置 表和es索引的映射
dataSourceKey: defaultDS #数据源
destination: pay_parent_0 #也可以从监听的数据源取数据 mq或者canal
outerAdapterKey: es #对应的适配器的key
groupId: g1 #对应适配器的分组
esMapping:
index: pay_parent_0 #es索引名称
id: id #数据库主键对应es文档id 插入数据时一定要填写
# upsert: true #是否更新 以主键id作为更新条件
sql: "select id, user_id, status, creator, create_time, updater, update_time, deleted, tenant_id
from pay_parent_0 as a"
etlCondition: "where a.id={}" # etl 的条件参数 接口请求
commitBatch: 3000 # 提交批大小
下载canal源码,启动lanucher 模块,client-adapter 启动配置
srcDataSources: defaultDS: url: jdbc:mysql://xxx:3306/demo0?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT username: root password: xxx canalAdapters: - instance: canal.test.queue # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 key: es hosts: http://xxx:39200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest cluster.name: elasticsearch
启动后执行,先测试第一条数据插入
http://127.0.0.1:8081/etl/es7/es/payParent0.yml?params=1571052632021184514
插入后查看kibana,以创建时间搜索,如图所示,值得注意的是elasti存储时默认将时间转化为UTC时间存储,即0时区,内部默认是个长整型。所以这里的date格式的时间都时零时区的,但是搜索的时候kibana会进行时区转换。搜出来的结果是准确的。
另外,重复请求时,指定了id插入的数据会覆盖原来的数据,这个是es内部api的功能。
简单测试完后,测试下10w的数据插入,这里只需要把请求中的参数去掉就行,执行http://127.0.0.1:8081/etl/es7/es/payParent0.yml
可以看到性能还是不错,10w的数据准确无误的插入,花了41ms,外网和硬件条件一般的情况,内网的花更快。
利用canal adpter的es插件可以实现mysql 同步的数据的功能,存量数据批量更新或者批量插入,非常方便。里面的源码插件的实现,配置文件分离,插入实例的实现以及mysql数据的批量插入都可以借鉴。若后续业务中有设计数据迁移到es中,参考实现是非常有帮助的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。