赞
踩
hd-sync
请点击:架构图链接
如架构图所示,hd-sync具备如下模块:
下面将一一介绍。
概述:该模块并非hd-sync所有,而是说明一下,如果业务方需要接入hd-sync,需要做哪些约定。
业务方提供的数据源(sql语句),必须有全局唯一业务标识(字段),如订单号,运单号,发票号。
业务方提供的数据源(sql语句),必须有自增id作为自然主键。
业务方提供的数据源(sql语句)中,针对update_time字段,有如下要求:
为实现在update_time范围跨度较大时仍能做到索引覆盖,业务方的数据表应该提供联合索引(id,update_time)。对应sql为:
select id from t where id > x AND id <= periodMaxId AND update_time between m and n order by id limit pagesize;
概述:该模块的职责在于数据抽取,衔接业务方数据源与数据通道(kafka),并不直接与ES交互,而是将写入ES的工作交给了后续的ES-Adapter来完成。
该模块包括:手动增量,手动全量,定时增量,三个子模块。下面将一一介绍。
方案 | 优点 | 缺点 |
---|---|---|
zookeeper | 临时节点的特性,可天然支持对节点变化的感知; 可利用临时节点作为集群节点,实现对节点宕机和上线的及时处理 | 相比DB方案,多引入了一层zk组件,架构复杂; 由于完全是三方组件,技术问题不可控; |
db表+守护线程 | 用最基本的线程和数据库表来实现集群节点的健康监控,方法论和问题解决方案更丰富; 完全自研,架构简单,风险更可控; | 需要开发人员手动编码来实现对集群节点健康状态的感知 |
结论: 我们选择“db表+守护线程”的方案,没有必要只为了一个集群节点的检测,而引入一个zk架构。
方案 | 优点 | 缺点 |
---|---|---|
ali canal | 读mysql binlog 实时性较高 无代码侵入性 | 需要canal-server和canal-client两个project canal-server需要部署HA,强依赖zk,且只有一个节点工作 对表结构变更、数据量猛增、网络超时等风险,不可控 若出现不可预知异常,排查成本较高 不能定制化开发 |
自研定时增量 | 风险可控,完全自研 无代码侵入性 多节点部署时,多节点都可以工作 可根据需要定制化开发 | 实时性不严谨,只能做到批量定时 |
结论: 我们选择“自研定时增量”,可以应对更多个性化的业务问题;如果追求极致实时性,可采用业务方双写的形式(ES-Adapter会提供用于业务方双写的开放API)。
表名:offset_info
id | schema | table | primary_key | sql | last_offset | node_id |
---|---|---|---|---|---|---|
1 | hdd_tms | d_dispatch | order_no | select a,b,c,d from d_dispatch where e=1 | 2020-06-22 05:18:22 | node-01 |
表名:node_info
id | node_id | node_ip | heartbeat_timestamp |
---|---|---|---|
1 | node-01 | 172.168.5.18 | 1234567887 |
2 | node-02 | 172.168.5.16 | 1234566799 |
表名:batch_info
id | batch_num | schema | table | period_from | period_to | node_id | status | start_time | complete_time | total_count | success_count | fail_count | create_time | udpate_time |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 1000001 | hdd_tms | d_dispatch | 2020-06-22 05:18:22 | 2020-06-22 05:20:22 | node-01 | COMPLETED | 2020-06-22 05:18:50 | 2020-06-22 05:25:50 | 900 | 850 | 50 | 2020-06-22 05:18:50 | 2020-06-22 05:18:50 |
2 | 1000002 | hdd_tms | d_dispatch | 2020-06-22 05:18:22 | 2020-06-22 05:20:22 | node-01 | RUNNING | 2020-06-22 05:18:50 | 900 | 2020-06-22 05:18:50 | 2020-06-22 05:18:50 |
寄生在线程池中,根据任务参数(表名,开始时间,截止时间)来执行具体的数据抽取。开始执行任务时,需要更新batch_info表的status为RUNNING;在任务执行过程中,需要每处理X条数据,就更新一次batch_info表(的success_count,fail_count,update_time字段)【说明:X是可配置的,要控制在合理的心跳超时阈值范围内,如果处理X条数据一般需要10秒,则心跳超时阈值不能低于10秒】;全部执行完成之后,需要更新batch_info的status字段为COMPLETED。
手动全量往往是在新表初始化上线时,需要触发一次;在手动全量彻底结束之前,不可以提前开始定时增量任务;所以,应该在手动全量开始时,记录偏移量,在手动全量完成之后,将该偏移量入库,此时,定时增量才可以根据该偏移量,开始执行;
在任务分配线程把增量任务提交到线程池之后,就应该记录批次入库,状态可以为WAITING。不能等待任务真正执行时再记录。
1. 增量抽取为“分段”抽取,防止因为大量数据被刷导致的增量数据过多。首先统计本次抽取的时间段内,总行数是否大于阈值L(如10万)。
1.1. 如果大于L,则将该时间段内的所有数据的id全量查出放于list(1000万个id大约占用内存70M,可进一步确认),然后每L条数据一组(首先要确认一下,10万条完整数据要占用多大内存),做in语句查询(in语句不能大于4M)。把每组的查询结果,放于kafka中。
1.2. 如果小于L,则直接全量取出,然后一次性放于kafka;
2. 数据抽取,应做好日志打印,能通过日志辨识出数据抽取的进度,防止假死、死锁等问题导致跑批记录长期状态为RUNNING,影响下次跑批。
select max(update_time) as max_update_time from 主表;
select min(id) as min_id from 主表 where update_time between last_offset and max_update_time;
select id from t where id >= min_id AND update_time between last_offset and max_update_time order by id limit page_size;
select * from join_sql where orign_where = 'abc' and id in (id集合)
。数据量大而已。
业务数据源可能是变化的,如新接入的业务表,可能与当前已接入的表,不是同一个数据库。
我们考虑采用配置的形式,当新接入业务表时,可通过读取新增的数据源配置,自动创建数据源(连接池)。
在新增业务表时,需做配置。hd-sync可通过读取配置,动态根据字段映射,创建出ES的index(mapping),传给kafka,有ES-Adapter创建ES索引。
增加业务表,删除业务表,均会触发重平衡。
如果有新节点加入,则会在心跳线程的第一次心跳时,执行重平衡。
如果有节点宕机,则会在健康检查线程中,待所有宕机的节点都被物理删除之后,执行重平衡。
场景描述: 表t目前是有节点node1负责的,并且此时此刻,节点node1正在抽取表t的数据;然而在此时,节点node2加入集群,表t恰好被分配给了node2,此时node2会立即进行数据抽取,导致数据抽取被重复处理。
解决方案: nodeX在做数据抽取之前,会先检查batch_info表,根据schema+table+last_offset+status做重复校验,如果status为RUNNING,则说明其他节点正在抽取该批数据,需等待其为COMPLETED之后再行执行。同时,也要检查batch_info表中node_id对应的节点是否健康(node_info表中是否存在),如果不健康,则立即执行跑批。
概述:查补逻辑转为查缺补漏而设。虽然我们已经(在ES-Adapter中)做了失败处理,但我们担心其并不能做到天衣无缝,我们担心依然会有漏网之鱼导致的db与es不一致。这种漏网之鱼,可能是由于数据抽取层漏抓数据,也可能是由于kafka丢失数据,也可能是由于ES-Adapter的未知异常导致。所以,我们增设了这么一个查补模块,上接业务数据源,下接ES。常年全表扫描业务表,并与ES中的数据做校验。
定时执行,频率暂定每天一次(可配置),预留手动触发按钮。
执行逻辑:
在ES-Adapter中,如果写入ES失败,可能会将失败记录与失败原因,发送至kafka相关topic。此时,查补模块将作为该topic的消费者,订阅将采用正则表达的方式es-error-*,订阅所有index的写入失败数据。
目前考虑的处理形式为,将失败数据取出,并写入db中,等待人工处理。
概述:该模块解耦“数据抽取”与“数据写入”,将两部分容易产生瓶颈的模块,分隔开来,可以做到单点优化,任何一个模块产生瓶颈,不会影响另一个正常运行。同时也让职责更加明确,如:数据写入对硬件资源的占用,不会干扰数据抽取。
统一使用json格式发送和接收消息。
概述:ES-Adapter,是衔接kafka与ES的中间层。该模块不参与DB操作,只与kafka和ES有交互:消费kafka中的数据,并将数据写入ES;收集写入ES失败的数据,发回kafka错误主题。
提供接口,入参为(app_id,index_name,json_data),为业务系统提供双写能力。该接口会直接把数据送至ES,不经过kafka等中间件的流转过程,实时性较高。
意义在于:如果业务系统对数据实时性追求极致,则应该主动采用双写策略,即在写完db之后,应该手动调用该开放API实现双写到ES(不用等待定时增量同步)。这也是追求极致实时性所必须产生的代码侵入性。
提供接口,入参为(索引,查询条件列表,排序信息,分页信息),实现多条件经过组合查询,可以按照任意字段排序。
在将消费的数据写入到ES之前,会根据ID+update_time先行校验,看ES中的数据与当前消费的数据是否一致;如果ES中无次数据,则新增;如果ES中的数据交旧,则更新;如果ES中数据较新或与当前消费数据一致,则无需任何操作;
针对kafka中消费的数据,字段如果与ES中index的字段不一致:针对新增情况,则要触发ES字段新增,针对其他情况,需要重建index;
注意:ES新增字段后,历史数据的查询,应该不能按新增的字段查询出来。
针对写入ES失败的情况,方案有二:
梯度重试:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。