赞
踩
1.1新建maven工程
1.2引入对应的rabbitMq依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
1.3配置类型代码示例
- @Configuration
- public class MyRabbitMqConfig {
- /**
- * 声明交换机的名称
- */
- public static final String EXCHANGE_NAME="order-exchange";
- /**
- * 声明队列名称
- */
- public static final String QUEUE_NAME="order-queue";
- /**
- * 声明路由键
- */
- public static final String ROUTE_KEY="route-order-queue-key";
-
- /**
- * 创建交换机
- * @return
- */
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange(EXCHANGE_NAME);
- }
-
- /**
- * 创建队列
- * @return
- */
- @Bean
- public Queue queue(){
- return new Queue(QUEUE_NAME);
- }
- /**
- * 绑定交换机与队列
- */
- @Bean
- public Binding binding(){
- return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTE_KEY);
- }
-
- }
1.4生产者代码示例
- @Component
- public class MessageProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
- public void send(String message){
- System.out.println("生产者发送消息:"+message);
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(message);
- rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
- }
- }
1.5消费者代码示例
- @Component
- public class MessageConsumer {
- @RabbitListener(queues = MyRabbitMqConfig.QUEUE_NAME)
- public void handlerMessage(String msg){
- System.out.println("消费者受到消息:"+msg);
- System.out.println("开始处理业务逻辑.....");
- }
- }
1.6程序代码验证
- @RestController
- public class RabbitMqController {
- @Autowired
- MessageProducer messageProducer;
- @RequestMapping("/sendMessage")
- public String sendMessage(){
- messageProducer.send("订单消息");
- return "success";
- }
- }
(1)通过实现消费的重试机制,通过 @Retryable 来实现重试,可以设置重试次数和重试频率。(2)生产端实现消息可靠性投递。以上两种方法消费端都可能收到重复消息,所以要求消费端必须实现幂等性消费。
2.2消息投递以及消费的流程图
2.3生产端保证消息的可靠性
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我 们提供了两种方式用来控制消息的投递可靠性模式。rabbitMq的消息投递过程是producer---borker---exchange-queue-consumer
- #开启消息确认模式
- spring.rabbitmq.publisher-confirm-type=correlated
- @Component
- public class MyConfirmMessageCallBack implements RabbitTemplate.ConfirmCallback {
- /**
- * @param correlationData 相关配置信息
- * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
- * @param cause 失败原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if(ack){
- //接收成功
- System.out.println("成功发送消息到exchange");
- System.out.println("correlationData:"+ JSONUtil.toJsonStr(correlationData));
- }else{
- System.out.println("发送消息到exchange失败");
- //在这可以做一些业务处理,比如消息的重新投递等
- }
- }
- }
4.修改生产者代码
- @Component
- public class MessageProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Autowired
- MyConfirmMessageCallBack myConfirmMessageCallBack;
- public void send(String message){
- System.out.println("生产者发送消息:"+message);
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(message);
- //设置消息确认回调接口
- rabbitTemplate.setConfirmCallback(myConfirmMessageCallBack);
- //发送消息
- rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
- }
- }
- #开启消息退回模式
- spring.rabbitmq.publisher-returns=true
- @Component
- public class MessageProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Autowired
- MyConfirmMessageCallBack myConfirmMessageCallBack;
- @Autowired
- MyReturnMessageCallBack myReturnMessageCallBack;
- public void send(String message){
- System.out.println("生产者发送消息:"+message);
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(message);
- //设置消息确认回调接口
- rabbitTemplate.setConfirmCallback(myConfirmMessageCallBack);
- //确保消息发送失败后可以重新返回到队列中
- rabbitTemplate.setMandatory(true);
- //设置消息投递queue失败回调接口
- rabbitTemplate.setReturnsCallback(myReturnMessageCallBack);
- //发送消息
- rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
- }
- }
- @Component
- public class MyReturnMessageCallBack implements RabbitTemplate.ReturnsCallback {
- /**
- * 交换机路由消息不到queue的时候会触发该接口的回调
- * @param returnedMessage
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- String exchange = returnedMessage.getExchange();
- Message message = returnedMessage.getMessage();
- String routingKey = returnedMessage.getRoutingKey();
- int replyCode = returnedMessage.getReplyCode();
- String replyText = returnedMessage.getReplyText();
- System.out.println("交换机:"+exchange);
- System.out.println("消息内容:"+message);
- System.out.println("路由键:"+routingKey);
- System.out.println("应答码:"+replyCode);
- System.out.println("应答内容:"+replyText);
-
- }
- }
消费端确认ack机制
AcknowledgeMode.NONE :不确认AcknowledgeMode.AUTO :自动确认AcknowledgeMode.MANUAL :手动确认
- #开启消费者手动ack
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
- @Component
- public class MessageConsumer {
- @RabbitListener(queues = MyRabbitMqConfig.QUEUE_NAME)
- public void handlerMessage(String msg, Channel channel, Message message) throws IOException {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- System.out.println("消费者受到消息:"+msg);
- System.out.println("开始处理业务逻辑.....");
- //手动Ack[参数1:消息投递序号,参数2:批量签收]
- channel.basicAck(deliveryTag,true);
- } catch (Exception e) {
- //拒绝[参数1:消息投递序号,参数2:批量拒绝,参数3:是否重新加入队列]
- channel.basicNack(deliveryTag,true,true);
- }
-
- }
- }
3.验证
产生重复消费的原因:消费端ack的机制默认是自动ack的,如果消费者在发送ack的请求时,MQ突然挂掉就自然接收不到该消息的ack也就不会在队列中删除该消息,一旦有其他消费者过来消费消息的时候,MQ还是会投递已消费的消息给其他消费者,就产生了消息重复消费的问题。
主要有以下方式保证消息不被重复消费:
1.首先消费端开启手动Ack模式
2.设计消息事务表,在每次消费消息之前根据meessageId记录一条信息,每次消费之前根据messageId判断该消息是否被消费过,如果已经消费过就直接ack,ack之后broker会删掉该消息的。
1.使用分布式锁的机制,消费同一个队列的时候确保只有一个消费者在消费。
2.同类型的消息存放在同一个队列里,利用队列有序性特点且只有一个消费者在消费该队列,这样也是能够保证有序的。在集群的模式下我们可以设置队列的“单活模式”。x-single-active-consumer:单活模式,表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
英文缩写: DLX 。 Dead Letter Exchange (死信交换机),当消息成为 Dead message 后, 可以 被重新发送到另一个交换机,这个交换机就是DLX。
1. 队列消息长度到达限制;2. 消费者拒接消费消息, basicNack/basicReject, 并且不把消息重新放入原目标队列 ,requeue=false。3. 原队列存在消息过期设置,消息到达超时时间未被消费。
5.3队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key也就是说此时 Queue 作为 " 生产者 "。
延迟队列顾名思义,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。我们可以以一个业务场景看看延迟队列的应用常用,比如我们日常生活中使用美团点外卖,但是你下单了没有支付十五分钟或者是30分钟之后系统会默认给你去掉订单,这个是怎么实现的呢?没错就是使用了延迟队列。
需求:用户下单之后,没有支付,系统取消订单并修改状态。
6.1实现步骤
但是rabbitMQ中并没有提供延迟队列的实现,可以使用TTL+私信队列的方式来实现延迟队列的效果。设置队列过期时间30分钟,当30分钟过后,消息未被消费,进入死信队列,路由到指定队列,调用库存 系统,判断订单状态。
代码实现:
1.新增DeLayQueueConfig配置类
- @Configuration
- public class DeLayQueueConfig {
- /**
- * 死信队列名称
- */
- public static final String DEAD_QUEUE_NAME="dead-queue-demo";
- /**
- * 死信队列路由键
- */
- public static final String DEAD_ROUTE_KEY="dead-route-demo";
- /**
- * 死信队列路由交换机名称
- */
- public static final String DEAD_EXCHANGE_NAME="dead-exchange-demo";
- /**
- * 延迟订单队列名称
- */
- public static final String DELAY_ORDER_QUEUE_NAME="delay-order-queue";
- /**
- * 延迟交换机名称
- */
- public static final String DELAY_ORDER_EXCHANGE_NAME="delay-order-exchange";
- /**
- * 路由键
- */
- public static final String DELAY_ORDER_ROUTE_NAME="delay-order-route";
-
-
- /**
- * 创建死信交换机
- * @return
- */
- @Bean
- public DirectExchange deadExchange(){
- return new DirectExchange(DEAD_EXCHANGE_NAME);
- }
-
- /**
- * 死信队列
- * @return
- */
- @Bean
- public Queue deadQueue(){
- return new Queue(DEAD_QUEUE_NAME);
- }
-
- /**
- * 邦定延迟队列与交换机
- * @return
- */
- @Bean
- public Binding deadBinding(){
- return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTE_KEY);
- }
-
- /**
- * 创建订单交换机
- * @return
- */
- @Bean
- public DirectExchange delayExchange(){
- return new DirectExchange(DELAY_ORDER_EXCHANGE_NAME);
- }
-
- /**
- * 订单队列
- * @return
- */
- @Bean
- public Queue delayQueue(){
- Map<String, Object> arguments = new HashMap<>(2);
- // 绑定我们的死信交换机
- arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
- // 绑定我们的路由key
- arguments.put("x-dead-letter-routing-key", DEAD_ROUTE_KEY);
- return new Queue(DELAY_ORDER_QUEUE_NAME,true,false,false,arguments);
- }
-
- /**
- * 邦定订单队列与交换机
- * @return
- */
- @Bean
- public Binding delayBinding(){
- return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ORDER_ROUTE_NAME);
- }
-
- }
2.新增DelayProducer类
- @Component
- @Slf4j
- public class DelayProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
- public void send(String message){
- System.out.println("生产者发送订单消息:"+message+",发送时间:"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT));
- MessagePostProcessor messagePostProcessor = messagePostProcessor();
- //发送消息
- rabbitTemplate.convertAndSend(DeLayQueueConfig.DELAY_ORDER_EXCHANGE_NAME,DeLayQueueConfig.DELAY_ORDER_ROUTE_NAME,message,messagePostProcessor);
- }
- //给待发送的消息添加TTL配置
- private MessagePostProcessor messagePostProcessor() {
- return new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- MessageProperties messageProperties = message.getMessageProperties();
- // 设置编码
- messageProperties.setContentEncoding("utf-8");
- //设置有效期30分钟
- //messageProperties.setExpiration("1800000");
- //测试修改为10秒
- messageProperties.setExpiration("10000");
- return message;
- }
- };
- }
- }
3.新增DelayConsumer类
- @Component
- @Slf4j
- public class DelayConsumer {
-
- @RabbitListener(queues = DeLayQueueConfig.DEAD_QUEUE_NAME)
- public void delayConsumer(String msg, Channel channel, Message message) throws IOException {
- System.out.println("延迟队列接收到消息:"+msg+",消费时间:"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT));
- System.out.println("查询订单状态,修改状态值");
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
3.controller新增测试请求
- @RestController
- public class RabbitMqController {
- @Autowired
- MessageProducer messageProducer;
- @Autowired
- DelayProducer delayProducer;
- @RequestMapping("/sendMessage")
- public String sendMessage(){
- messageProducer.send("订单消息");
- return "success";
- }
- @RequestMapping("/sendDelayMessage")
- public String send(){
- delayProducer.send("订单消息");
- return "success";
- }
- }
4.测试验证
1.浏览器访问http://localhost:8081/sendDelayMessage
本文重点介绍了springboot如何整个RabbitMq以及常用的api,对确保消息不丢失,重复消费,顺序消费问题进行了分析以及解决方法,最后介绍了RabbitMQ如何使用死信队列+TTL机制实现延迟队列的案例。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。