赞
踩
afeiqiang 2020-08-10
你一定遇到过这种情况,接收到消息时并不符合马上处理的条件(例如频率限制),但是又不能丢掉,于是先存起来,过一阵子再来处理。系统应该怎么设计呢?可能你会想到数据库,用一个字段来标记执行的状态,或者设置一个等待的时间戳,不管是哪种都需要反复地从数据库存取,还要考虑出异常情况状态的维护。
作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。
以下代码均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中测试通过。建议事先阅读文档 Spring for Apache Kafka 以便能很好地理解以下内容。
设计 2 个队列(topic),一个收到消息马上执行,另一个用来接收需延迟处理的消息。话句话说,接收延迟消息的队列直到消息可执行之前一直在 block 状态,所以有局限性,定时不能非常精确,并且任务执行次序与加进来的次序是一致的。
- application.yml
-
- ————————————————————
-
-
- spring:
-
- ## kafka
-
- kafka:
-
- bootstrap-servers: 127.0.0.1:9092
-
- consumer:
-
- group-id: myGroup
-
- auto-offset-reset: earliest
-
- enable-auto-commit: false
-
- properties:
-
- max:
-
- poll:
-
- interval:
-
- # 设置时间必须比延迟处理的时间大,不然会报错
-
- ms: 1200000
-
- listener:
-
- # 把提交模式改为手动
-
- ack-mode: MANUAL
kafka 默认的消费模式是自动提交,意思是,当 MessageListener 收到消息,执行处理方法后自动提交已完成状态,该消息就从队列里移除了。配置 ack-mode: MANUAL 改为手动提交后,我们就可以根据需要保留数据在消息队列,以便以后再处理。
max.poll.interval.ms 设小了可能会收到下面的错误:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
请务必设置一个比等待执行时间更长的时间。
- @Autowired
-
- private KafkaTemplate kafkaTemplate;
-
-
- public void myAction(){
- // 定义 data
- // 任务推送到 Kafka
- kafkaTemplate.send(“myJob", data.toString());
- }
该部分没有特别的地方,跟普通的消息消息发送一样。
定义两个 topic:myJob 和 myJob-delay
- @SpringBootApplication
-
- @ServletComponentScan
-
- public class Application {
-
-
- @KafkaListener(topics = “myJob”)
-
- @SendTo(“myJob-delay")
-
- public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
-
- String json = (String) cr.value();
-
- JSONObject data = JSON.parseObject(json);
-
-
- if (/* 需要延迟处理 */){
-
- // 提交
-
- ack.acknowledge();
-
- // 发送到 @SendTo
-
- data.put("until", System.currentTimeMillis() + msToDelay);
-
- return data.toString();
-
- }
-
-
- // 正常处理
-
- // do real work
-
-
- // 提交
-
- ack.acknowledge();
-
- return null;
-
- }
-
-
- @KafkaListener(topics = “myJob-delay")
-
- @SendTo(“myJob")
-
- public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){
-
-
- String json = (String) cr.value();
-
- JSONObject data = JSON.parseObject(json);
-
- Long until = data.getLong("until");
-
- // 阻塞直到 until
-
- while (System.currentTimeMillis() < until){
-
- Thread.sleep( Math.max(0, until - System.currentTimeMillis()) );
-
- }
-
-
- // 提交
-
- ack.acknowledge();
-
- // 转移到 @SendTo
-
- return json;
-
-
- }
-
- }
代码很简单,不用解释也能看明白。稍微提一下几个重要的地方。
@KafkaListener 的方法参数里有 Acknowledgment ack,这是AckMode.MANUAL 模式下必须要添加的参数。
ack.acknowledge() 用来标记一条消息已经消费完成,即将从消息队列里移除。执行之前消息会一直保留在队列中,即时宕机重启后也能恢复。
@SendTo 用来在队列(topic)间转移消息,只要 return 非 null 的数据。以上代码中,当需要延迟处理时,消息从 myJob 转移到 myJob-delay;而当条件满足时,消息又从 myJob-delay 转移到了 myJob。
自从 spring-kafka 2.2.4 版本之后,可以在方法上定义 max.poll.interval.ms ,更加灵活了。例如
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000”,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”}
)
以上是延迟消息处理的简单实现,适合延时要求不那么高的场合。朋友们想一下,假如延时比较复杂,执行的次序也不一定跟消息到达的次序一致,系统又该怎样设计呢?
假如这篇文章对你有所帮助, 请关注我公众号, 发现更多有用的文章
参考:安全验证 - 知乎
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。