赞
踩
在很多场景下,我们的系统平台都会与其它业务平台进行一些数据的交互同步,比如传统制造业的设备数据,常常需要同步数据给下层业务平台进行使用,那么一般是在什么时间进行同步?以什么样的方式进行同步呢?会存在什么样的问题?
一般是增量的时候实时推送,比如设备的新增、修改、删除。
采用kafka消息中间件进行数据的交互同步,在设备进行增量操作的时候,给设备数据添加一个操作标识位operateType,比如新增为ADD,修改为Modify,删除为DELETE。每当执行增量操作的时候就把数据放入kafka消息队列中,让下层业务平台进行实时消费。
一、中途可能会出现消息丢失的问题
做一个补偿机制
1.kafka可以设置设置重试次数,可以设置3次,3次都失败的情况下可以把失败消息放入本地消息表,用定时任务轮询消息表, 重新推送到kafka。
2.如果数据变动量比较大有必要的话定时做一个全量的推送,定时时间一般放在凌晨。
3.数据量变化较小的情况下,可以在前端加个选择设备进行推送按钮,其实跟全量推送一样,只不过是加个过滤。如果选择的推送数据较多的话,后端同步处理的情况下,会导致前端页面等待时间较长,所以后端在推送数据最好用多线程异步处理,前端点击推送按钮直接显示推送成功。
另一个问题出现,从用户角度前端直接显示成功,但是后端其实还在执行推送数据,用户是无法感知的,用户可能再次或多次点击手动推送,会给系统造成压力,甚至崩溃。处理方法是redis缓存加入判断key,进入推送方法前加key,推送方法完成后释放key。最好不要用全局变量做为判断条件,如果分布式的话,全局变量无法共享传递。
二、推送数据没进入kafka
1.检查topic名称是否正确
2.kafka默认单条消息最大1M
(1)如何扩大单条信息长度参考博客:kafka单条消息过大解决方法
(2)可以进行分批发送,例如每100条进行发送
public void inBatchesSend(List<Device> devList) { // 每批次数据记录数量 int limitSize = 100; //判断是否有必要分批 if (limitSize < devList.size()) { //当前数据按限制条数可分为多少批次 int batchNum= devList.size() / limitSize ; List<Device> batchList ; for (int i = 0; i < batchNum; i++) { // 截取批次长度的list batchList = devList.subList(0, limitSize ); // 异步发送数据 sendKafka(batchList ); // 去除已经处理的部分 (Arrays.asList()方式生成的数据不能进行此修改操作,会报错) batchList .clear(); } // 获取最后一次截取后的剩余列表数据 if (!devList.isEmpty()) { // 异步发送数据 sendKafka(devList); } } else { // 异步发送数据 sendKafka(devList); } }
扩展:
1.kafka可视化工具——kafka Tool
推荐博客:kafka可视化客户端工具(Kafka Tool)的基本使用
2.定时任务——xxl-job
推荐博客:定时任务调度解析之xxl-job
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。