赞
踩
发送重试机制:
RocketMQ支持发送失败内部重试,默认是2次,在异步、SendOneWay模式下不支持重试。
消费重试机制:
RocketMQ在消费失败时,支持消费端重试继续消费消息,默认支持16次重试,每次重试的时间间隔增加,只有在Cluster模式支持重试、广播模式不支持重试。
在重试期间,消息的key和ID不会发生改变,应用程序可以以此做好幂等性控制。
在重试期间,还可以继续消费其它新消息。
- package com.tech.rocketmq.jms;
-
- /**
- * @author lw
- * @since 2021/11/15
- */
- public class JmsConfig {
- public static final String NAME_SERVER = "192.168.50.135:9876;192.168.50.136:9876";
- public static final String TOPIC = "tech_pay_test_topic_aaa";
- }
- package com.tech.rocketmq.jms;
-
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.springframework.stereotype.Component;
-
- /**
- * @author lw
- * @since 2021/11/15
- */
- @Component
- public class PayProducer {
- private String producerGroup="pay_group";
- private DefaultMQProducer producer;
- public PayProducer(){
- producer=new DefaultMQProducer(producerGroup);
- //指定NameServer地址,多个地址以;隔开
- //如producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877")
- producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
- //生产者投递消息重试次数(内部重试),默认是2次,异步和SendOneWay下配置无效
- producer.setRetryTimesWhenSendFailed(3);
- start();
- }
-
- public DefaultMQProducer getProducer() {
- return producer;
- }
-
- /**
- * 对象使用之前必须调用一次,只能初始化一次
- */
- private void start() {
- try {
- this.producer.start();
- } catch (MQClientException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 一般在应用上下文,使用上下文监听器,进行关闭
- */
- private void shutDown(){
- this.producer.shutdown();
- }
- }
- package com.tech.rocketmq.controller;
-
- import com.tech.rocketmq.jms.JmsConfig;
- import com.tech.rocketmq.jms.PayProducer;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.HashMap;
-
- /**
- * @author lw
- * @since 2021/11/15
- */
- @RestController
- public class PayController {
-
- @Autowired
- private PayProducer payProducer;
-
-
- @GetMapping("send")
- Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- // Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
- //发送消息时,指定消息的key,比如订单编号
- Message message = new Message(JmsConfig.TOPIC, "taga", "666", ("hello word = " + text).getBytes());
- SendResult sendResult = payProducer.getProducer().send(message);
- System.out.println(sendResult);
- return new HashMap<>();
- }
- }
- package com.tech.rocketmq.jms;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
- import org.springframework.stereotype.Component;
-
- import java.util.List;
-
- /**
- * @author lw
- * @since 2021/11/15
- */
- @Slf4j
- @Component
- public class PayConsumer {
- private DefaultMQPushConsumer consumer;
- private String consumerGroup = "pay_consumer_group";
-
- public PayConsumer() throws MQClientException {
- consumer = new DefaultMQPushConsumer(consumerGroup);
- consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- //默认是集群模式,如果改为广播模式不支持消费端重试
- // consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.subscribe(JmsConfig.TOPIC, "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- MessageExt message = list.get(0);
- int reconsumeTimes = message.getReconsumeTimes();
- log.info("重试次数:{}", reconsumeTimes);
- try {
- log.info("Receive New Message: {}", new String(message.getBody()));
- String topic = message.getTopic();
- String tags = message.getTags();
- String keys = message.getKeys();
- if(keys.equals("666")){
- throw new Exception("模拟异常");
- }
- log.info("topic={} tags={} keys={}", topic, tags, keys);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } catch (Exception e) {
- log.error("消费异常",e);
- if(reconsumeTimes>=2){
- log.info("重试次数大于等于2,记录数据库,发短信通知开发人员或者运营人员");
- //告诉broker,本次消费成功
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- });
- consumer.start();
- System.out.println("consumer start ...");
- }
- }
当重试2次,给broker回复 CONSUME_SUCCESS 不再重试。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。