赞
踩
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
常规服务调用
异步服务调用
异步: 访问速度更快,用户体验更好,提升系统吞吐量
第一种操作,总耗时920ms,第二种操作,总耗时25ms,大大提升了用户体验
第一种方式一个系统崩溃,导致关联系统同时崩溃,第二种方式,则不会,耦合性降低
常规开发使用rabbitmq原因: Erlang开发语言,不丢消息,消息延迟最低,虽然阿里的RocketMQ吞吐量大,但是不开源,如后期不维护了,存在风险,而rabbitMQ社区活跃也是开源的,是最好的选择,kafka主要是用于大数据领域准备的,功能没有这个完善
一个发送者,一个消费者,一个队列,一对一的关系进行发送
创建rabbit-common工程(公共模块),用于定义公共的Bean
在公共模块引入依赖
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Configuration public class RabbitConfig { public static final String HELLO_QUEUE = "hello_queue"; @Bean public Queue helloQueue() { // 两种创建方式,第二参数true为持久化 //return new Queue(HELLO_QUEUE, true); // durable 即持久化 return QueueBuilder.durable(HELLO_QUEUE).build(); } }
META-INF\spring
org.springframework.boot.autoconfigure.AutoConfiguration.imports
上面两步解决公共引入公共模块加载不到bean的问题,方便消息发送方和消息消费方引入该公共模块,避免重复定义一样的Bean
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>rabbit-common</artifactId>
</dependency>
server:
port: 8001
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
public interface MqPublisherService {
void sendMsg(String queue,Object msg);
}
import com.ruoyi.rabbit.publisher.service.MqPublisherService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Service public class MqPublisherServiceImpl implements MqPublisherService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMsg(String queue, Object msg) { rabbitTemplate.convertAndSend(queue,msg); } }
import com.ruoyi.rabbit.common.conf.RabbitConfig; import com.ruoyi.rabbit.publisher.service.MqPublisherService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @RestController @RequestMapping("/mq/publisher") public class MqPublisherController { @Autowired private MqPublisherService mqPublisherService; @GetMapping("/sendHelle") public String sendHelle() { mqPublisherService.sendMsg(RabbitConfig.HELLO_QUEUE,"hello world"); return "ok"; } }
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class HelloListener { @RabbitListener(queues = {RabbitConfig.HELLO_QUEUE}) // 指定监听的队列 public void helloReceiver(Message message, Channel channel, String msg) { // Message对象中包含消息的各种信息也包括了这里的msg // Channel对象用于调用手动确认等方法 // 会自动将消息转换为指定的类型 System.out.println("hello 监听消息收到: " + msg); } }
同hello模式类似,只是多了一个或多个消费者对同一队列进行监听, 一条消息只能由一个消费者进行消费,默认使用轮询的方式
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class HelloListener { @RabbitListener(queues = {RabbitConfig.HELLO_QUEUE}) public void helloReceiver(Message message, Channel channel, String msg) { System.out.println("hello 监听消息收到: " + msg); } // 新增一个对hello队列的监听 @RabbitListener(queues = {RabbitConfig.HELLO_QUEUE}) public void helloReceiver2(Message message, Channel channel, String msg) { System.out.println("hello2 监听消息收到: " + msg); } }
广播模式,也称发布订阅模式,交换机会将消息发送给所有绑定的队列
package com.ruoyi.rabbit.common.conf; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Configuration public class RabbitConfig { // fanout ==================================== public static final String FANOUT_EXCHANGE = "fanout_exchange"; public static final String FANOUT_QUEUE_1 = "fanout_queue_1"; public static final String FANOUT_QUEUE_2 = "fanout_queue_2"; @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } @Bean public Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE_1, true); } @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE_2, true); } @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } // fanout ==================================== }
@GetMapping("/sendFanout")
public String sendFanout() {
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, null, "hello fanout");
return "fanout ok";
}
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class FanoutListener { @RabbitListener(queues = {RabbitConfig.FANOUT_QUEUE_1}) public void directReceiver(Message message, Channel channel, String msg) { System.out.println("fanout1 监听消息收到: " + msg); } @RabbitListener(queues = {RabbitConfig.FANOUT_QUEUE_2}) public void directReceiver2(Message message, Channel channel, String msg) { System.out.println("fanout2 监听消息收到: " + msg); } }
发送者将消息发送到交换机,并指定对应的routing_key,由交换机根据routing_key去匹配对应的队列
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Configuration public class RabbitConfig { // direct ==================================== public static final String DIRECT_EXCHANGE = "direct_exchange"; public static final String DIRECT_QUEUE_1 = "direct_queue_1"; public static final String DIRECT_QUEUE_2 = "direct_queue_2"; public static final String DIRECT_ROUTING_KEY_1 = "direct_routing_key_1"; public static final String DIRECT_ROUTING_KEY_1_2 = "direct_routing_key_1_2"; public static final String DIRECT_ROUTING_KEY_2 = "direct_routing_key_2"; @Bean public DirectExchange directExchange() { // 参数1,name:exchange的名称; 参数2,durable:持久化; 参数3:autoDelete:自动删除 //return new DirectExchange(DIRECT_EXCHANGE, true, false); // 方式二 return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build(); } @Bean public Queue directQueue1() { return new Queue(DIRECT_QUEUE_1, true); } @Bean public Queue directQueue2() { return new Queue(DIRECT_QUEUE_2, true); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY_1); } // directQueue1队列和directExchange交换机间绑定了两个routingKey @Bean public Binding bindingDirect1_2() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY_1_2); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with(DIRECT_ROUTING_KEY_2); } // direct ==================================== }
说明: directQueue1 队列和 directExchange 交换机间绑定了两个routingKey,分别:direct_routing_key_1 和 direct_routing_key_1_2,当发送者往directExchange 交换机发送消息指定上面两个routingkey时,都会发送到 directQueue1 这个消息队列
// controller
@GetMapping("/sendDirect")
public String sendDirect() {
mqPublisherService.sendMsg(RabbitConfig.DIRECT_EXCHANGE,RabbitConfig.DIRECT_ROUTING_KEY_1,"hello direct");
return "direct ok";
}
// sevice
@Override
public void sendMsg(String exchange, String routingKey, Object msg) {
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class DirectListener { @RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE_1}) public void directReceiver(Message message, Channel channel, String msg) { System.out.println("direct1 监听消息收到: " + msg); } @RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE_2}) public void directReceiver2(Message message, Channel channel, String msg) { System.out.println("direct2 监听消息收到: " + msg); } }
同direct模式,只是routing_key可以使用通配符
*星号:表示有且仅有一个单词
#井号:表示任意个数单词
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Configuration public class RabbitConfig { // topic ==================================== public static final String TOPIC_EXCHANGE = "topic_exchange"; public static final String TOPIC_QUEUE_MAN = "topic_queue_man"; public static final String TOPIC_QUEUE_ALL = "topic_queue_all"; public static final String TOPIC_ROUTING_KEY_MAN = "topic.man"; public static final String TOPIC_ROUTING_KEY_WOMAN = "topic.woman"; public static final String TOPIC_ROUTING_KEY_ALL = "topic.*"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE, true, false); } @Bean public Queue topicQueueMan() { return new Queue(TOPIC_QUEUE_MAN, true); } @Bean public Queue topicQueueAll() { return new Queue(TOPIC_QUEUE_ALL, true); } @Bean public Binding topicBindingMan() { return BindingBuilder.bind(topicQueueMan()).to(topicExchange()).with(TOPIC_ROUTING_KEY_MAN); } @Bean public Binding topicBindingALL() { return BindingBuilder.bind(topicQueueAll()).to(topicExchange()).with(TOPIC_ROUTING_KEY_ALL); } // topic ==================================== }
@GetMapping("/sendTopic")
public String sendTopic() {
//rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, RabbitConfig.TOPIC_ROUTING_KEY_MAN, "hello topic");
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, RabbitConfig.TOPIC_ROUTING_KEY_WOMAN, "hello topic");
return "topic ok";
}
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class TopicListener { @RabbitListener(queues = {RabbitConfig.TOPIC_QUEUE_MAN}) public void directReceiver(Message message, Channel channel, String msg) { System.out.println("fanout man 监听消息收到: " + msg); } @RabbitListener(queues = {RabbitConfig.TOPIC_QUEUE_ALL}) public void directReceiver2(Message message, Channel channel, String msg) { System.out.println("topic all 监听消息收到: " + msg); } }
作用: 发送方 保证消息发送成功
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 开启确认模式
publisher-returns: true # 开启退回模式
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * desc: * * @author qts * @date 2023/4/3 0003 */ @Configuration public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnsCallback { private static final Logger log = LoggerFactory.getLogger(RabbitTemplateConfig.class); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 实现confirm回调,发送和没发送到exchange,都触发 (前提: 确认模式开启:yml中publisher-confirm-type: correlated) * CorrelationData数据可以在rabbitTemplate.convertAndSend时传入 并这种CorrelationData的setId参数,回调时能取到 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 参数说明: // correlationData: 相关数据,可以在发送消息时,进行设置该参数 // ack: 结果 // cause: 原因 if (ack) { log.info("【ConfirmCallback】消息已经送达Exchange,ack已发"); } else { log.warn("【ConfirmCallback】消息没有送达Exchange"); // todo 做一些处理,让消息再次发送。 消息缓存或入库,邮件提醒运维 } } // 实现return回调:当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack (前提:退回模式开启:yml中publisher-returns: true) @Override public void returnedMessage(ReturnedMessage returned) { // 返回参数说明 //String exchange = returned.getExchange(); // 该消息指定的 exchange //String routingKey = returned.getRoutingKey(); // 该消息指定的 routingKey //Message message = returned.getMessage(); // 消息对象 //int replyCode = returned.getReplyCode(); // 回应 code //String replyText = returned.getReplyText(); // 回应 内容 log.warn("【ReturnsCallback】消息没有送到队列中"); // todo 处理 邮件发送,缓存或存到数据库 } }
一般需要对mq做集群保证消息中的exchange和queue不会挂掉从而导致无法发送消息到exchange 和 queue 的情况,避免发生此异常请求,我们需要启动对消息的发送进行监控,使用confirm模式和return模式监控消息到达exchange和queue的情况,出现异常则进行邮件通知运维,并记录消息内容到缓存或数据库
spring:
rabbitmq:
publisher-confirm-type: correlated
spring:
rabbitmq:
publisher-returns: true
作用: 确保消费方消费成功,确保消息不会因为网络问题,而导致消息丢失。
定义exchange,和queue时,指定 durable 参数为 true
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动确认(默认自动确认)
concurrency: 1 # 监听器调用程序线程的最小数量(即每个@RabbitListener开启几个线程去处理。如有两个@RabbitListener指向同一队列,并且concurrency=2,则有4个线程同时处理4条消息)
max-concurrency: 10 # 监听器调用程序线程的最大数量
prefetch: 250 # 消费方限流:每个消费者可以处理的未确认消息的最大数量,提前拉取消息,不代表消费消息
import com.rabbitmq.client.Channel; import com.ruoyi.rabbit.common.conf.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * desc: * * @author qts * @date 2023/3/31 0031 */ @Component public class HelloListener { @RabbitListener(queues = {RabbitConfig.HELLO_QUEUE}) public void helloReceiver(Message message, Channel channel, String msg) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("hello 监听消息收到: " + msg); // 正常处理后,手动进行确认 ,第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(deliveryTag,false); } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败, /** * 拒绝确认消息:<br> * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br> * deliveryTag:该消息的index<br> * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br> * requeue:被拒绝的是否重新入队列 <br> */ channel.basicNack(deliveryTag,false,true); /** * 拒绝一条消息:<br> * channel.basicReject(long deliveryTag, boolean requeue);<br> * deliveryTag:该消息的index<br> * requeue:被拒绝的是否重新入队列 */ //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); e.printStackTrace(); } } }
作用: 防止系统一次处理过多请求,导致系统崩溃
操作: 使用两个参数进行控制 prefetch 和 concurrency
// yml中是全局设置, 注解中是单个消费者设置,更推荐注解方式设置
// 线程个数可以动态伸缩 最小1,最多4 , 也可设置为 concurrency = "4" , 指定4个线程并发处理消息
// 买个线程等于一个监听, 每个监听都会拉取指定的prefetch个消息到自己的阻塞队列中等待消费
@RabbitListener(queues = {RabbitConfig.BATTLE_PAPER_QUEUE},concurrency = "1-4")
参考博客:RabbitMQ并发消费者关键参数prefetch,concurrency
前提: 消费端是手动确认的 acknowledge= “manual”
配置: 在消费方yml配置中设置prefetch参数,默认250, 如prefetch=“1” 会在手动确后再拉取下一次,代表每个@RabbitListener监听每次预加载多少消息到内存中等待消费的最大值,不是每次都必须抓取prefetch所设置的值
时间到了,所有消息被删除
操作: 创建队列queue时,指定参数 x-message-ttl 并设置值
第5个参数通过map的方式进行设置,或者使用建造者模式创建
代码
// 过期时间 设置 ==============================
@Bean
public Queue ttlQueue() {
// 方式一: new 方式
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);// 设置过期时间
//return new Queue("ttlQueueName",true,false,false,args);
// 方式二: 建造模式
return QueueBuilder.durable("ttlQueueName").ttl(5000).build();
}
// 过期时间 设置 ==============================
等价与rabbitMQ中的如下操作
时间到了,当消息在队列顶端时(即将消费时)再判断,并删除
操作: 发送消息时,通过参数 messagePostProcessor 指定处理器,并对Message对象中的属性进行修改
@GetMapping("sendTtlFanout") public String sendTtl() { // 该方法在消息转换成 Message对象后, 发送到交换机之前,进行回调 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置消息的到期时间 message.getMessageProperties().setExpiration("5000"); return message; } }; // 使用MessagePostProcessor方式,就不用自己去传Message对象,covertAndSend内可以自动进行转换 rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,null,"ttl fanout",messagePostProcessor); return "ttl ok"; }
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机)
死信交换机: 就是一个普通的交换机,只是用来处理死信而已
死信: 无法被消费的消息
// 死信队列 ================================== public static final String DLX_EXCHANGE = "dlx_exchange"; public static final String DLX_ROUTING_KEY = "dlx_routing_key"; public static final String DLX_Queue = "dlx_queue"; public static final String TTL_Queue = "ttl_queue"; // 5秒过期后,会将消息发送到指定的 DLX_EXCHANGE 交换机上,并指定routingkey,消息会有 DLX_EXCHANGE 按照指定的 routingkey路由到对应的Queue上 @Bean public Queue ttlQueue2() { return QueueBuilder.durable(TTL_Queue) //持久化 .ttl(5000) // 5秒过期 .deadLetterExchange(DLX_EXCHANGE) // 指定死信交换机 .deadLetterRoutingKey(DLX_ROUTING_KEY) // 指定发送到死信交换机的 routing_key,用于dlx交换机定位 queue .build(); } @Bean public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE, true, false); } // 消费者上监听这个队列,则可以实现延迟队列功能,ttl时间到了之后,就会进入这个队列, @Bean public Queue dlxQueue() { return new Queue(DLX_Queue, true); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); } // 死信队列 ==================================
死信交换机就是一个普通交换机, 就是在创建一个Queue的时候,指定过期时间 (ttl) 和 过期后需要将消息转发到哪个交换机和用什么routingkey(即:指定deadLetterExchange() 和 deadLetterRoutingKey() )
保证消息一定发送成功
生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的补偿机制:通过消息落库 + 定时任务来实现。
CREATE TABLE `t_cap_published_message` (
`id` varchar(40) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '标识。',
`version` varchar(20) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '版本',
`exchange` varchar(200) COLLATE utf8mb4_bin DEFAULT '' COMMENT '交换机。',
`topic` varchar(200) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '话题。',
`content` longtext COLLATE utf8mb4_bin NOT NULL COMMENT '消息内容。',
`retries` int(11) NOT NULL COMMENT '重试次数,一般为 3 次。',
`expiry` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '过期时间。',
`status` varchar(40) COLLATE utf8mb4_bin NOT NULL COMMENT '状态,成功则消息ack成功,其他状态都要重试。',
`created_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间。',
`last_modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间,可以用作数据版本。',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='发布的消息。';
可参考https://blog.csdn.net/weixin_44399827/article/details/124317144
防止同一消息被多次消费
消息的消费者成功消费了消息,并进行了手动应答 ack后,由于网络等原因,Rabbitmq没有收到消费者的ack ,导致将此消息发送给了其他消费者进行消费,则出现了重复消费同一消息的情况
就是在消费者消费消息时,进行判断当前消息是否被消费了,消费了就不做业务处理直接决绝, 即需要一个唯一标识进行判断
怎么判断消息是否被消费了呢,通过一个全局的ID
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。