赞
踩
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
- server.port=8081
- #rabbitmq服务器ip
- spring.rabbitmq.host=localhost
- #rabbitmq的端口
- spring.rabbitmq.port=5672
- #用户名
- spring.rabbitmq.username=guest
- #密码
- spring.rabbitmq.password=guest
- #配置虚拟机
- spring.rabbitmq.virtual-host=demo
- #开启发送确认机制,消息到达交换机后会有回调
- spring.rabbitmq.publisher-confirm-type=correlated
- #可以确保消息在未被队列接收时返回
- spring.rabbitmq.publisher-returns=true
-
- ##发送重试配置
- #启用发送重试
- #spring.rabbitmq.template.retry.enabled=true
- #最大重试次数
- #spring.rabbitmq.template.retry.max-attempts=5
- #第一次和第二次尝试发布或传递消息之间的间隔
- #spring.rabbitmq.template.retry.initial-interval=1000ms
- #应用于上一重试间隔的乘数 步长
- #spring.rabbitmq.template.retry.multiplier=2
- #最大重试时间间隔
- #spring.rabbitmq.template.retry.max-interval=10000ms
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.6.3</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.78</version>
- </dependency>
- @Configuration
- @Slf4j
- public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initRabbitTemplate(){
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- }
-
- /**
- * durable:是否持久化
- * exclusive:是否独享、排外的
- * autoDelete:是否自动删除
- * @return
- */
- @Bean
- Queue addUserQueue(){
- return new Queue(RabbitConstant.QUEUE_ADD_USER,true,false,false);
- }
-
- /**
- * 消息成功到达交换机触发该方法
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack){
- //消息成功到达交换机
- log.info("{}消息成功到达交换机",correlationData.getId());
- }else{
- log.error("{}消息未到达交换机,原因:{}",correlationData.getId(),cause);
- }
-
- }
-
- /**
- * 配置publisher-returns为true 消息未成功到达队列,会触发该方法
- * @param returned
- */
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("{}消息未到达队列",returned.toString());
- }
- }
这里我们直接用直连交换机,【DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上】
再写个常量类专门放队列名,交换机名啥的,写到配置文件也可以
- public class RabbitConstant {
- /**
- * 简单消息队列
- */
- public static final String QUEUE_HELLO_MSG = "hello_world_mq";
-
- /**
- * 队列
- */
- public static final String QUEUE_ADD_USER = "queue.add.user";
-
- }
- @Service
- public class ProducerServiceImpl implements ProducerService {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Override
- public Boolean addUser(User user) {
- //这里进行一些操作,然后把用户信息发送到消息队列
- String userStr = JSON.toJSONString(user);
- rabbitTemplate.convertAndSend(RabbitConstant.QUEUE_ADD_USER, (Object) userStr,new CorrelationData(UUID.randomUUID().toString()));
- return true;
- }
- }
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class User implements Serializable {
- private static final long serialVersionUID = 1809655848237434192L;
- private Integer id;
- private String userName;
- private String describe;
- }
- @RestController
- public class ProducerController {
-
- @Autowired
- ProducerService producerService;
-
- @PostMapping("/addUser")
- public Boolean addUser(@RequestBody User user){
- return producerService.addUser(user);
- }
-
- }
- server.port=8082
- #rabbitmq服务器ip
- spring.rabbitmq.host=localhost
- #rabbitmq的端口
- spring.rabbitmq.port=5672
- #用户名
- spring.rabbitmq.username=guest
- #密码
- spring.rabbitmq.password=guest
- #配置虚拟机
- spring.rabbitmq.virtual-host=demo
- #设置消费端手动 ack none不确认 auto自动确认 manual手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
pom就不贴了都一样的
- @Service
- @Slf4j
- public class ConsumerService {
-
- public static final String QUEUE_ADD_USER = "queue.add.user";
-
- @RabbitListener(queues =QUEUE_ADD_USER)
- @RabbitHandler
- public void addUser(String userStr,Message message, Channel channel){
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- //在这里做一些操作
- User user = JSONObject.parseObject(userStr,User.class);
- log.info(user.toString());
- //手动ack 第一个参数是消息的标记,第二个参数代表是false 代表仅仅确认当前消息,为true表示确认之前的所有消息
- channel.basicAck(deliveryTag,false);
- } catch (Exception e) {
- //告诉mq本条消息消费失败
- try {
- channel.basicNack(deliveryTag,false,true);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
- }
发送后我们看消费者这边已经拿到了
来不及截图我又发送了一次,看一下RabbitMQ的控制台
再修改一下消费端代码,直接除零异常,看是否会出现Nack
好了我们再发送一次
而且该消息一直在再投递
下一篇具体讲讲如何处理这种情况
基本概念可以参考此篇博文rabbitmq入门
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。