赞
踩
RocketMQ系列文章
RocketMQ(四):重复消费、消息重试、死信消息的解决方案
服务端对客户端应答失败
客户端给服务端反馈应答的时候网络闪断
Broker 或客户端重启、扩容或缩容
时生产者
@Test
void test1() {
// 业务唯一编号
String key = "1300";
Message<String> message = MessageBuilder
.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, key)
.build();
// 相同的key发送两次
rocketMQTemplate.syncSend("repeatedTopic", message);
rocketMQTemplate.syncSend("repeatedTopic", message);
System.out.println("发送完成");
}
消费者
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatMysqlListener implements RocketMQListener<MessageExt> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void onMessage(MessageExt message) {
// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
String messageKey = message.getKeys();
try {
jdbcTemplate.execute("INSERT INTO `user` (`num_no`,`name`) VALUES('" + messageKey + "','名称')");
} catch (DataAccessException e) {
// 该message可能是重复的
if (e instanceof DuplicateKeyException) {
System.out.println(messageKey+"的业务编号数据重复了,直接return,就算消费了此重复数据");
return;
}
}
// 获取消息执行业务
System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
}
}
执行结果:
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1300的业务编号数据重复了,直接return,就算消费了此重复数据
Redisson分布式锁配置
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6390")
.setPassword("xc@1234")
.setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
生产者
@Test
void test1() {
// 业务唯一编号
String key = "1400";
Message<String> message = MessageBuilder
.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, key)
.build();
// 相同的key发送两次
rocketMQTemplate.syncSend("repeatedTopic", message);
rocketMQTemplate.syncSend("repeatedTopic", message);
System.out.println("发送完成");
}
消费者
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatRedisListener implements RocketMQListener<MessageExt> {
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MessageExt message) {
// 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
String messageKey = message.getKeys();
RLock redissonLock = redisson.getLock(messageKey);
try {
// 添加redisson锁并实现锁续命功能
// 默认过期时间是30s,每10s触发一次锁续命功能
redissonLock.lock();
List<String> topicBusinessKeyList = stringRedisTemplate.opsForList().range("topicBusinessKey",0,-1);
if ( ObjectUtils.isNotEmpty(topicBusinessKeyList) && topicBusinessKeyList.contains(messageKey)) {
System.out.println(messageKey + "的业务编号数据重复了,直接return,就算消费了此重复数据");
return;
}
// 获取消息执行业务
System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
// 讲businessKey存入redis
stringRedisTemplate.opsForList().rightPush("topicBusinessKey", messageKey);
} finally {
redissonLock.unlock();
}
}
}
执行结果:
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据
application.yml配置文件设置
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
多线程
模式下,重试16
次,设置超过 16 次的重试时间间隔均为每次 2 小时顺序
模式下,重试Integer.MAX_VALUE次,间隔1秒消费者配置
RocketMQPushConsumerLifecycleListener
接口,从prepareStart方法中获取消费者
并设置它相同GroupID
下的所有Consumer
实例有效@Component
@RocketMQMessageListener(topic = "retryTopic",
consumerGroup = "retry-consumer-group"
)
public class RetryListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
System.out.println("当前时间: " + new Date());
//获取消息的重试次数
System.out.println("重试次数: " + message.getReconsumeTimes());
System.out.println("接收内容: " + new String(message.getBody()));
System.out.println("----------------------------------------------");
throw new RuntimeException("测试重试次数");
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 设置消费者重试次数
defaultMQPushConsumer.setMaxReconsumeTimes(2);
// 实例名称-控制面板可以看到
defaultMQPushConsumer.setInstanceName("消费者1号");
}
}
设置重试二次的执行结果:
单独队列
%DLQ%原消费者组名
,死信队列的消息将不会再被消费上一节的消费者重试两次后,就会将消息放入死信队列
处理死信消息方式一:
@Component
@RocketMQMessageListener(
topic = "%DLQ%retry-consumer-group",
consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理消息 签收了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
}
}
处理死信消息方式二:
@Component
@RocketMQMessageListener(
topic = "%DLQ%retry-consumer-group",
consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
// 业务处理
try {
int i = 1 / 0;
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 不要重试了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
}else {
throw new RuntimeException("异常");
}
}
}
}
一般认为单条队列消息差值>=10w时 算堆积问题
什么情况下会出现堆积
根据IO(2n)/CPU(n+1)
)Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。