赞
踩
BAJI_GENERIC_DELAY_5M_QUEUE_DEAD("jihuanji_risk","baji_generic_delay_5m_queue_dead","300000","通用延时5分钟死信队列")
交换机:jihuanji_risk
routing_key:baji_generic_delay_5m_queue_dead
延时时间:300000(单位:毫秒)
描述:通用延时5分钟死信队列
YINNIDAI_GENERIC_DELAY_QUEUE("jihuanji_risk","baji_generic_delay_queue","0","通用延时队列"),
交换机:jihuanji_risk
routing_key:baji_generic_delay_queue
延时时间:0(单位:毫秒)
描述:通用延时队列
//交换机 @Value("${exchange.baji.risk}") private String exchangeBajiRisk; 创建普通队列: /*创建普通队列,并绑定交换机*/ String BAJI_GENERIC_DELAY_QUEUE = QueueSendEnum.BAJI_GENERIC_DELAY_QUEUE.getRoutingKey(); @Bean(name = "queueGenericDelayQueue") public Queue queueGenericDelayQueue() { return new Queue(BAJI_GENERIC_DELAY_QUEUE); } @Bean Binding bindingQueueGenericDelayQueue(@Qualifier("queueGenericDelayQueue") Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(BAJI_GENERIC_DELAY_QUEUE); } 将普通交换机绑定到死信队列: /*创建延时5分钟的死信队列,并绑定消息过期后的交换机*/ String BAJI_GENERIC_DELAY_5M_QUEUE_DEAD = QueueSendEnum.BAJI_GENERIC_DELAY_5M_QUEUE_DEAD.getRoutingKey(); @Bean(name = "queueGenericDelay5MQueueDead") public Queue queueGenericDelay5MQueueDead() { Map<String, Object> map = new HashMap<>(2); map.put("x-dead-letter-exchange", exchangeBajiRisk); map.put("x-dead-letter-routing-key", BAJI_GENERIC_DELAY_QUEUE); return new Queue(BAJI_GENERIC_DELAY_5M_QUEUE_DEAD,true,false,false, map); } @Bean Binding bindingQueueGenericDelay5MQueueDead(@Qualifier("queueGenericDelay5MQueueDead") Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(BAJI_GENERIC_DELAY_5M_QUEUE_DEAD); }
public void sendMQDelay5M(String message) {
sendMQ(QueueSendEnum.BAJI_GENERIC_DELAY_5M_QUEUE_DEAD, message, true);
}
/** * 推送RabbitMQ消息队列 * * @param queueSendEnum 推送消息类型枚举 * @param message 推送消息 * @param delayFlag 若是延迟队列,添加消息过期时间(也可在死信队列里设置,专属死信队列需自己创建) */ public synchronized void sendMQ(QueueSendEnum queueSendEnum, String message, boolean delayFlag) { String traceId = UUIDUtil.getUUID(); String exchange = queueSendEnum.getExchange(); String routingKey = queueSendEnum.getRoutingKey(); logger.info("RabbitMqProducer.sendMQ-begin-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message); if (StringUtils.isBlank(exchange) || StringUtils.isBlank(routingKey) || StringUtils.isBlank(message)) { logger.error("RabbitMqProducer.sendMQ-end-parameter is required-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message); return; } try { if (delayFlag) { logger.info("RabbitMqProducer.sendMQ-setExpiration:{} millis, traceId:{}", queueSendEnum.getExpiration(), traceId); rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置消息过期时间 message.getMessageProperties().setExpiration(queueSendEnum.getExpiration()); return message; } }); } else { rabbitTemplate.convertAndSend(exchange, routingKey, message); } logger.info("RabbitMqProducer.sendMQ-end-success-traceId:{}, exchange:{}, routingKey:{}", traceId, exchange, routingKey); } catch (AmqpException e) { logger.error("RabbitMqProducer.sendMQ-end-error-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message, e); } }
@RabbitHandler
@RabbitListener(queues = "baji_generic_delay_queue")
public void leakageRecallListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
//message为接收的消息,对消息进行处理即可
}
先将消息放入死信队列中,消息过期后会把消息放入通用延时队列中,监听消费延时队列的消息即可。场景可用于定时发送push。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。