赞
踩
最近在使用RabbitMQ作消息转发,供消费者使用,遇到了多消费者消费同一批数据的问题。
1.多个消费者消费同一批数据;
2.每条数据每个消费者都要消费
生产端不创建队列,只负责把消息发送到交换机并指定routingKey,这里为了避免消息投递到RabbitMQ失败,采用了手动确认方式,如
# spring配置 # 手动确认消息投递情况 spring.rabbitmq.publisher-confirms: true spring.rabbitmq.publisher-returns: true spring.rabbitmq.template.mandatory: true # java代码 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback((correlationData, ack, error) -> { System.out.println("消息唯一标识:" + correlationData); System.out.println("确认结果:" + ack); System.out.println("失败原因:" + error); }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println("消息主体 message : " + message); System.out.println("响应 code : " + replyCode); System.out.println("描述:" + replyText); System.out.println("消息使用的交换器 exchange : " + exchange); System.out.println("消息使用的路由键 routing : " + routingKey); }); }
消费端创建不自动删除的持久化队列,指定exchange、routingKey、queueName,如
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(value = "dynamicQueue", autoDelete = "false", durable = "true"),
exchange = @Exchange(value = "exchange", durable = "true", type = ExchangeTypes.DIRECT),
key = "routingKey"
)
}
)
public void dynamicQueue(Message message, Channel channel) {
System.out.println("接收消息:" + new String(message.getBody()));
}
上述代码会自动创建队列并使用routingKey绑定交换机
要想保证正确消费,消费端最好使用手动确认,手动确认的代码如
// 第二个参数的含义是是否批量确认,会把小于当前消息id(自增)的全部确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
尤其需要注意的是,下述配置含义是强制消费端手动确认,不确认RabbitMQ会认为没有消费
spring.rabbitmq.listener.simple.acknowledge-mode: manual
spring.rabbitmq.listener.simple
spring.rabbitmq.listener.direct
这两项配置似乎不同,经实践得到,生产者发送消息到队列需配置simple项,配置direct项无效,
发送到交换机可配置direct或simple项,下面是一个例子,配置实现“发送完毕就删除消息,不论消费者是否确认”
spring.rabbitmq.listener.simple.acknowledge-mode: none # 发送到队列不需要确认,配置下一项是无效的
spring.rabbitmq.listener.direct.acknowledge-mode: none # 发送到交换机不需要确认,配置上一项同样有效
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。