赞
踩
目录
保证消息的可靠性主要依靠三种机制:一个是消息的持久化,一个是事务机制,一个就是消息的确认机制。
1)消息持久化
消息持久化是将消息写入本地文件,如果rabbitmq故障退出,在重启时会从本地文件系统读取队列数据。
2)事务机制
rabbitmq的事务机制提供了消息生产者和消息服务器(broker)之间的事务的开启,提交,回滚操作(如下图所示)。这套机制可以保证消息可靠性,但也有缺点:由于使用事务机制会导致消息生产者和broker(服务器)交互次数增加,造成性能的浪费,且事务机制是阻塞的,在发送一条消息后需要等待RabbitMQ回应,获取回应后才能发送下一条消息,因此事务机制并不提倡使用(RabbitMQ事务模式与非事务模式在性能上相差可达高达上百倍,具体数值因机器性能和网络环境而不同,但差异都会非常明显)
事务提交流程:
3)消息确认
消息确认分为:发送者确认,接收方确认。 发送者确认分为:消息到达交换机确认,消息到达与交换机绑定的队列确认。
用于示例开发基础代码:
git clone -b rabbitmqDemo git@gitee.com:heizifeng/rabbit-mqdemo.git
因为:每个RabbitTemplate实例只能注册一个ConfirmCallback,所以如果启动web容器并多次调用该方法进行消息发送,则会报异常。(测试用例可以通过,是因为每次测试执行完毕后容器就终止,下次运行时是新的容器)
增加RabbitTemplate的配置类,在配置类中指定消息确认回调方法:
- package com.zking.rabbitmqdemo.provied.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- *
- * @site www.xiaomage.com
- * @company xxx公司
- * @create 2021-11-14 10:04
- */
- @Configuration
- @Slf4j
- public class RabbitTemplateConfig {
-
- @Bean
- public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate template = new RabbitTemplate(connectionFactory);
- template.setMandatory(true);
- template.setMessageConverter(new Jackson2JsonMessageConverter());
- template.setEncoding("utf-8");
-
- //实现消息发送到exchange后接收ack回调,publisher-confirms:true
- //如果队列是可持久化的,则在消息成功持久化之后生产者收到确认消息
- template.setConfirmCallback(((correlationData, ack, cause) -> {
- if(ack) {
- log.info("消息成功发送到exchange,id:{}", correlationData.getId());
- } else {
- /*
- * 消息未被投放到对应的消费者队列,可能的原因:
- * 1)发送时在未找到exchange,例如exchange参数书写错误
- * 2)消息队列已达最大长度限制(声明队列时可配置队列的最大限制),此时
- * 返回的cause为null。
- */
- log.info("******************************************************");
- log.info("11消息发送失败: {}", cause);
- }
- }));
-
- //消息发送失败返回队列,publisher-returns:true
- template.setMandatory(true);
-
- //实现消息发送的exchange,但没有相应的队列于交换机绑定时的回调
- template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- String id = message.getMessageProperties().getCorrelationId();
- log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", id, replyCode, replyText, exchange, routingKey);
- });
-
- return template;
- }
- }
编写用于发送消息的延迟队列(死信)
- package com.zking.rabbitmqdemo.provied.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- *
- * @site www.xiaomage.com
- * @company xxx公司
- * @create 2021-11-12 10:04
- */
- @Configuration
- public class RabbitMQConfig {
-
- @Bean(name="directExchange")
- public Exchange directExchange() {
- return ExchangeBuilder.directExchange("direct_exchange").durable(true).build();
- }
-
- @Bean(name="directQueue")
- public Queue directQueue() {
- Map<String,Object> args = new HashMap<>();
- args.put("x-message-ttl", 1000*60*20);
- args.put("x-max-length", 3);
- args.put("x-overflow","reject-publish");
- return QueueBuilder.durable("direct_queue").withArguments(args).build();
- }
-
- @Bean
- public Binding directBinding(
- @Qualifier("directQueue") Queue queue,
- @Qualifier("directExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("direct_exchange_routing_key")
- .noargs();
- }
-
-
- @Bean(name="topicExchange")
- public Exchange topicExchange() {
-
- return ExchangeBuilder
- .topicExchange("topic_exchange")
- .durable(true)
- .build();
- }
-
- @Bean(name="topicQueue1")
- public Queue topicQueue1() {
- return QueueBuilder.durable("topic_queue_q1").build();
- }
-
- @Bean(name="topicQueue2")
- public Queue topicQueue2() {
- return QueueBuilder.durable("topic_queue_q2").build();
- }
-
- @Bean
- public Binding topicBindingQ1(
- @Qualifier("topicQueue1") Queue queue,
- @Qualifier("topicExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("topic.queue.#")
- .noargs();
- }
-
- @Bean
- public Binding topicBindingQ2(
- @Qualifier("topicQueue2") Queue queue,
- @Qualifier("topicExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("topic.queue.#")
- .noargs();
- }
-
- //死信队列
- @Bean(name="dxlExchange")
- public Exchange dxlExchange() {
- return ExchangeBuilder.topicExchange("dxl_exchange").durable(true).build();
- }
-
- @Bean(name="dxlQueue")
- public Queue dxlQueue() {
- return QueueBuilder.durable("dxl_queue").build();
- }
-
- @Bean
- public Binding bindingDxl(
- @Qualifier("dxlQueue") Queue queue,
- @Qualifier("dxlExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("routing_dxl_key")
- .noargs();
- }
-
- @Bean(name="usualExchange")
- public Exchange usualExchange() {
- return ExchangeBuilder.directExchange("usual_direct_exchange").durable(true).build();
- }
-
- @Bean(name="usualQueue")
- public Queue usualQueue() {
- return QueueBuilder.durable("usual_queue")
- .withArgument("x-message-ttl", 1000*60*2)
- .withArgument("x-max-length", 5)
- .withArgument("x-dead-letter-exchange", "dxl_exchange")
- .withArgument("x-dead-letter-routing-key", "routing_dxl_key")
- .build();
- }
-
- @Bean
- public Binding bindingUsualQueue(
- @Qualifier("usualQueue") Queue queue,
- @Qualifier("usualExchange") Exchange exchange) {
-
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("routing_usual_key")
- .noargs();
- }
-
-
- }
编写测试Service 发送消息
- package com.zking.rabbitmqdemo.provied.service;
-
- /**
- * @author aq
- * @site www.xiaomage.com
- * @company xxx公司
- * @create 2021-11-12 10:11
- */
- public interface ISendMsgService {
-
- void sendDirectMsg();
-
- void topicExchangeSend();
-
- void dxlExchangeSend();
-
- void confirmMessage();
- }
- package com.zking.rabbitmqdemo.provied.service;
-
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.UUID;
-
- /**
- * @author aq
- * @site www.xiaomage.com
- * @company xxx公司
- * @create 2021-11-12 10:08
- */
- @Service
- public class SendMsgService implements ISendMsgService {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Override
- public void sendDirectMsg() {
- String msg = "rabbitmq direct exchange send msg "
- + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-
- rabbitTemplate.convertAndSend("direct_exchange", "direct_exchange_routing_key",msg);
- }
-
- @Override
- public void topicExchangeSend() {
- String msg = "rabbitmq topic exchange send msg "
- + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-
- rabbitTemplate.convertAndSend("topic_exchange", "topic.queue.msg", msg);
- }
-
- @Override
- public void dxlExchangeSend() {
- String msg = "rabbitmq usual exchange send msg "
- + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-
- rabbitTemplate.convertAndSend("usual_direct_exchange", "routing_usual_key", msg);
- }
-
- @Override
- public void confirmMessage() {
- String msg = "rabbitmq direct exchange send msg and confirm "
- + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
-
- rabbitTemplate.convertAndSend(
- "direct.exchange",
- "direct.exchange.routing.key",
- msg,
- correlationData);
- }
-
- }
编写测试接口
- package com.zking.rabbitmqdemo.provied.service;
-
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import static org.junit.Assert.*;
-
- /**
- *
- * @site www.xiaomage.com
- * @company xxx公司
- * @create 2021-11-12 10:12
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SendMsgServiceTest {
-
- @Autowired
- private ISendMsgService sendMsgService;
-
- @Test
- public void sendDirectMsg() {
-
- sendMsgService.sendDirectMsg();
- }
-
- @Test
- public void topicExchangeSend() {
- sendMsgService.topicExchangeSend();
- }
-
- @Test
- public void testDxlExchange() {
- sendMsgService.dxlExchangeSend();
- }
- }
1)直接使用@RabbitListener,@RabbitHandler注解,通过配置文件配置监听容器:
application.properties
-
- server.port=8084
-
- #rabbitMQç¸å³éç½®
- spring.rabbitmq.host=192.168.164.128
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=admin
- spring.rabbitmq.password=admin
- spring.rabbitmq.virtual-host=my_vhost
-
- #启用消息确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
- spring.rabbitmq.listener.direct.acknowledge-mode=manual
-
- #连接模式为channel
- spring.rabbitmq.cache.connection.mode=channel
- spring.rabbitmq.cache.channel.size=50
-
- #每个队列的消费者数量
- spring.rabbitmq.listener.direct.consumers-per-queue=2
-
- #侦听器调用者线程的最小数量
- spring.rabbitmq.listener.simple.concurrency=2
- spring.rabbitmq.listener.simple.max-concurrency=100
-
- #每次用队列中取出1个消息,在有多个消息消费者,且消息者处理能力不均时,可以
- #起到均衡各消息消费者的处理功能的功能
- spring.rabbitmq.listener.direct.prefetch=1
-
- /**
- * @author Administrator
- * @create 2020-02-2422:30
- */
- @Component
- @Slf4j
- public class ReceiverConfirmDemo implements ChannelAwareMessageListener {
-
- /**
- * 指定监听的队列.
- * 该注解可以放在方法上,也可以放在类上,如果放在类上则需要
- * 在一个方法上设置@RabbitHandler(isDefault=true),否则会报如下异常:
- * “Listener method ‘no match’ threw exception”。
- * 因此建议注解始终使用在方法上。
- */
- @RabbitListener(queues = "direct.queue")
- //指定该方法为消息处理器
- @RabbitHandler
- @Override
- public void onMessage(Message message, Channel channel) throws IOException {
-
- log.info("消息内容: {}",new String(message.getBody()));
-
- /**
- * 模拟业务处理方法.
- * 对应业务方法中出现的异常需要区分对待,例如以下情况:
- * 1)网络异常等可能恢复的异常,可以设置消息重新返回到队列,以便于重新处理
- * 2)对应业务数据等不可恢复的异常,则可以进行补偿操作,或放入死信队列进行人工干预
- */
- try {
- log.info("正在处理 ....");
- //延迟5秒
- TimeUnit.SECONDS.sleep(5);
-
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
- //模拟在业务处理是发生了网络异常,如:在连接数据库保存数据时网络发生了抖动
- //此类异常是可以恢复的,需要要消息重新返回队列,以便于下次处理
- if(deliveryTag % 2 == 0) {
- throw new ConnectException("模拟消息消费者发生网络异常");
- }
-
- //模拟发生不可恢复异常,此种情况消息重新入队没有意义
- if(deliveryTag % 3 == 0) {
- throw new ClassCastException("模拟消息消费者发生不可恢复异常");
- }
-
- } catch (SocketException se) {
-
- log.info("SocketException: {}", se.getMessage());
-
- //拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,false则不会重新入队
- //如果配置了死信队列则消息会被投递到死信队列
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
-
- //不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,
- //与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。
- // nack后的消息也会被自己消费到
- //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
-
- //是否恢复消息到队列,参数是是否requeue,true则重新入队列,
- // 并且尽可能的将之前recover的消息投递给其他消费者消费,
- //而不是自己再次消费。false则消息会重新被投递给自己
- //channel.basicRecover(true);
- return;
- } catch (Exception e) {
- //此处处理无法恢复的异常,可记录日志或将消息发送到指定的队列以便于后续的处理
- log.info("Exception: {}", e.getMessage());
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
- return;
- }
-
- log.info("处理完毕, 发送ack确认 .... ");
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
-
- }
编写完成消息消费者后,将通过消息生产者发送消息,查看消息消费的消费。
2)使用SimpleMessageListenerContainer配置监听容器
- /**
- * @author Administrator
- * @create 2020-03-0119:27
- */
- @Configuration
- @Slf4j
- public class MessageListenerContainer {
-
- @Autowired
- private ReceiverConfirmDemo receiverConfirmDemo;
-
- @Bean
- public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- container.setQueueNames("direct.queue");
-
- //后置处理器,接收到的消息都添加了Header请求头
- /*container.setAfterReceivePostProcessors(message -> {
- message.getMessageProperties().getHeaders().put("desc",10);
- return message;
- });*/
-
- container.setMessageListener(receiverConfirmDemo);
-
- return container;
- }
-
- }
在该类中注入的receiverConfirmDemo即为上面已经编写完成的ReceiverConfirmDemo消息处理器。
运行结果
如果又问题那么 如果是网络异常 客恢复数据那么会回到原有队列 ,如果是不可处理异常那么可以用数据库保存,人工介入处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。