赞
踩
pring:
rabbitmq:
...
# 配置消息重试
listener:
simple:
retry:
# 开启重试
enabled: true
# 重试三次
max-attempts: 3
# 间隔时间1s
max-interval: 1000
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; @Configuration public class SubscribeExchangeConfig { /** * 死信交换器 */ @Bean public DirectExchange emailDlxDirectExchange() { return ExchangeBuilder.directExchange("exchange.direct.dlx.springboot.email").build(); } /** * 死信队列 */ @Bean public Queue emailDlxQueue() { return QueueBuilder.durable("queue.direct.dlx.springboot.email").build(); } /** * 绑定死信交换器和死信队列 */ @Bean @Resource public Binding emailDlxBiding(Queue emailDlxQueue, DirectExchange emailDlxDirectExchange) { // 将路由使用路由键绑定到交换器上 return BindingBuilder.bind(emailDlxQueue).to(emailDlxDirectExchange).with("springboot.email.dlk.routing.key"); } /** * 直连交换器 */ @Bean public DirectExchange emailDirectExchange() { return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build(); } /** * 声明正常队列并指定死信交换器和队列 */ @Bean public Queue emailQueue() { Map<String, Object> params = new Hashtable<>(4); // 指定死信交换器 params.put("x-dead-letter-exchange", "exchange.direct.dlx.springboot.email"); // 指定死信队列 params.put("x-dead-letter-routing-key", "springboot.email.dlk.routing.key"); return QueueBuilder.durable("queue.direct.springboot.email").withArguments(params).build(); } /** * 交换器和队列绑定 */ @Bean @Resource public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange) { // 将路由使用路由键绑定到交换器上 return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key"); } }
@RabbitListener(queues = "queue.direct.dlx.springboot.email")
public void dlxReceiver(String msg) {
System.out.println("dlxReceiver = " + msg);
}
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver01(String msg) {
// 模拟异常情况
Integer.parseInt("a");
System.out.println("receiver01 message = " + msg);
}
@RabbitListener(queues = "queue.direct.dlx.springboot.email")
public void dlxReceiver(String msg) {
System.out.println("dlxReceiver = " + msg);
}
@RabbitListener(queues = "queue.direct.dlx.springboot.email") public void dlxReceiver(String msg, Message message, Channel channel) { System.out.println("dlxReceiver = " + msg); // 消息消息确认 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("消息确认成功"); } catch (IOException e) { // 变更消息状态 System.out.println(e.getMessage()); } // 消息消息拒绝 /*try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } catch (IOException e) { // 变更消息状态 System.out.println(e.getMessage()); System.out.println("消息拒绝出现异常"); }*/ }
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver(Message message) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
if(messageId != null && !cacheUtil.exists(messageId)) {
String msg = new String(message.getBody());
// 消息处理
cacheUtil.set(messageId, true);
} else {
System.out.println("已经消费过了");
}
// 模拟抛出异常
// throw new Exception("消费消息出现异常");
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。