赞
踩
实现消息延迟发送的两种方案:
1、定义两个常规的队列 A,B 。A 队列在初始化时添加参数:x-dead-letter-exchange ,值为B队列 的交换机名称如下:
// A业务队列 @Bean public Queue AQueue(){ Map<String,Object> map = new HashMap<>(1); map.put("x-dead-letter-exchange",MqConstants.B_EXCHANGE_NAME); //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key map.put("x-dead-letter-routing-key",MqConstants.B_QUEUE); return new Queue(MqConstants.A_QUEUE,true,false,false,map); } // A业务队列绑定 @Bean Binding bindingExchange() { return BindingBuilder.bind(AQueue()).to(ADirectExchange()).with(MqConstants.A_QUEUE); } //A业务交换机 @Bean DirectExchange ADirectExchange() { return new DirectExchange(MqConstants.A_EXCHANGE_NAME); } // B队列(死信队列) @Bean() public Queue BQueue(){ return new Queue(MqConstants.B_QUEUE); } // B队列绑定 @Bean Binding bindingBExchangeDead() { return BindingBuilder.bind(BQueue()).to(BDirectExchange()).withQueueName(); } //B交换机 @Bean DirectExchange BDirectExchange() { return new DirectExchange(MqConstants.B_EXCHANGE_NAME); }
这种方法的思路是,给消息绑定一个过期时间,推送到常规队列A 中,当到了这条消息的过期时间,则队列A把这消息推送到绑定的队列B 交换机中,此时队列B 可称为 死信队列。而消息成为死信的条件如下:
1、消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。
2、消息在队列里得时间超过了该消息设置的过期时间(TTL)。
3、消息队列到达了它的最大长度,之后再收到的消息。
当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。
所以上文的例子中,我们需要确保 过期时间的消息在A队列中不能被消费,也不手动拒绝。如果被消费了 那就起不到延迟的效果,如果手动拒绝了,过期时间的消息立即就会被投递到 死信队列中,如果一旦投递到死信队列(B)中,B队列的消费者立马就会消费了这条消息,也起不到消息延迟消费的效果。所以以上的例子中我们不去消息A队列里面的消息 任由它呆在队列里,直到过期了。自动推送到另外的队列B,我们只需要监听B队列的消息即可 ,消息生产者把带有过期时间的消息推向A 队列代码如下:
A 队列的消息生产者如下:
public ResponseData sendDel() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); MessageDTO messageDTO = new MessageDTO(); messageDTO.setTitle("hello rabbitmq 我是帅哥!!"); messageDTO.setContext(format.format(new Date())); CorrelationData correlationData = new CorrelationData(); String messageStr = JSONObject.toJSONString(messageDTO); byte[] bytes = messageStr.getBytes(); ReturnedMessage returnedMessage = new ReturnedMessage(new Message(bytes), 1, "1", MqConstants.A_EXCHANGE_NAME, ""); correlationData.setReturned(returnedMessage); long time = new Date().getTime(); // long exp = time + (2 * 60 * 1000); // 延迟2分钟 long exp = 120000; System.out.println("当前运行时间:"); System.out.println(format.format(new Date())); String ex = String.valueOf(exp); rabbitTemplate.convertAndSend(MqConstants.A_EXCHANGE_NAME, MqConstants.A_QUEUE, messageDTO, message -> { //设置消息的过期时间,是以毫秒为单位的 message.getMessageProperties().setExpiration(ex); return message; },correlationData); return ResponseData.success(); }
B队列的消费者如下:
@RabbitHandler @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MqConstants.B_EXCHANGE_NAME,durable = "true",type = "direct"), value = @Queue(value = MqConstants.B_QUEUE,durable = "true"), key = MqConstants.B_QUEUE )) public void listenerDead(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tagId) throws IOException { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); byte[] body = message.getBody(); String s = new String(body); MessageDTO messageDTO = JSONObject.parseObject(s, MessageDTO.class); log.info("死信队列执行中,消费消息id:{}",messageDTO.getTitle()); log.info("当前时间:{}",format.format(new Date())); // 消息确认,当前消息 //channel.basicAck(tagId,false); // 消息拒绝 当前消息,并会重新添加到队列中 //channel.basicNack(tagId,false,true); }
启动项目测试结果如下:
从上图中我们可以看到,确实是实现了消息的延迟发送。
以上方案存在一个问题:根据消息投递的顺序 会出现消息延迟消息消费的顺序不一致演示如下:
修改 发送时间为动态的,第一个消息延迟时间30秒,第二条消息 延迟时间 20秒 结果如下:
可以看得出,结果 并不是 延迟20m 的消息优先被消费,而是等到 第一条延迟了 30秒的消息过期了,才发现也有一条延迟了20m 的消息没有被消费,此时立刻把这两条消息,推送到私信队列B, 所以消费者消费的时间是一致的。
以上是第一种实现消息延迟发送的方案,下面使用插件时间消息的延迟发送案例,首先需要安装插件,安装步骤如下:
https://blog.csdn.net/XZB119211/article/details/126944959
第一步 定义交换机、队列、等如下:
// 插件延迟队列 @Bean public Queue deadQueueForEx(){ return new Queue(MqConstants.E_QUEUE,true,false,false); } // 插件延迟队列绑定 @Bean Binding bindingExchangeDeadForEx(Queue deadQueueForEx) { return BindingBuilder.bind(deadQueueForEx()).to(deadExchangeForEx()).with(MqConstants.E_QUEUE).noargs(); } //插件延迟队列交换机 @Bean public CustomExchange deadExchangeForEx(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct"); return new CustomExchange(MqConstants.E_EXCHANGE_NAME,"x-delayed-message",true,false,arguments); }
消息发送者如下:
// 插件时间延迟队列 public ResponseData sendDelForEx(long expTime) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); MessageDTO messageDTO = new MessageDTO(); messageDTO.setTitle("hello rabbitmq 我是帅哥!!"); messageDTO.setContext(format.format(new Date())); CorrelationData correlationData = new CorrelationData(); String messageStr = JSONObject.toJSONString(messageDTO); byte[] bytes = messageStr.getBytes(); ReturnedMessage returnedMessage = new ReturnedMessage(new Message(bytes), 1, "1", MqConstants.E_EXCHANGE_NAME, ""); correlationData.setReturned(returnedMessage); System.out.println("当前运行时间:"); System.out.println(format.format(new Date())); String ex = String.valueOf(expTime); rabbitTemplate.convertAndSend(MqConstants.E_EXCHANGE_NAME, MqConstants.E_QUEUE, messageDTO, message -> { //设置消息的过期时间,是以毫秒为单位的 message.getMessageProperties().setHeader("x-delay",ex); return message; },correlationData); return ResponseData.success(); }
消息消费者如下:
@RabbitListener(queues = MqConstants.E_QUEUE)
public void listenerDeadForEx(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tagId) throws IOException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
byte[] body = message.getBody();
String s = new String(body);
MessageDTO messageDTO = JSONObject.parseObject(s, MessageDTO.class);
log.info("延迟插件队列执行中,消费消息id:{}",messageDTO.getTitle());
log.info("当前时间:{}",format.format(new Date()));
}
测试第一条消息延迟 30ms ,第二条消息延迟 20ms
从上图消息 日志中可以看得出来 消息消费时确实是正常了,但是多打印处理 其他日志:
这部分日志是:监听消息发送时消息未到达队列的回调方法打印的,其代码如下:
@Bean(name = "rabbitTemplate") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 设置消息从生产者发送至 rabbitmq broker 成功的回调 (保证信息到达 broker) rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { // ack=true:消息成功发送到Exchange @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { ReturnedMessage returned = correlationData.getReturned(); Message message = returned.getMessage(); byte[] body = message.getBody(); String s = new String(body); MessageDTO messageDTO=JSONObject.parseObject(s, MessageDTO.class);; log.info("ConfirmCallback: " + "相关数据:" + messageDTO ); log.info("ConfirmCallback: " + "确认是否到达交换机:" + ack); log.info("ConfirmCallback: " + "原因:" + cause); } }); // 设置信息从交换机发送至 queue 失败的回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { byte[] body = message.getBody(); String s = new String(body); MessageDTO messageDTO=JSONObject.parseObject(s, MessageDTO.class);; log.info("ReturnCallback: " + "消息:" + messageDTO); log.info("ReturnCallback: " + "回应码:" + replyCode); log.info("ReturnCallback: " + "回应信息:" + replyText); log.info("ReturnCallback: " + "交换机:" + exchange); log.info("ReturnCallback: " + "路由键:" + routingKey); } }); // 为 true 时,消息通过交换器无法匹配到队列时会返回给生产者,为 false 时,匹配不到会直接丢弃 rabbitTemplate.setMandatory(true); // 设置发送时的转换 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; }
结合上面的代码我们可知,使用插件实现延迟消息时候,并不是把消息立马就推送到队列中,而只是将消息推送到交换机中,待过期时间到了才推送掉消息队列中,所以 发送消息 setReturnCallback 才会爆出 消息未到达队列的 日志,但也不影响最终的延迟消息执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。