当前位置:   article > 正文

分布式高级篇3 —— RabbitMQ_rabbitmq rabbitlistener

rabbitmq rabbitlistener

一、RabbitMQ

视频来源: 【Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目】
本笔记对应视频集数: P248 ~ P260

1、RabbitMQ 介绍

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包

裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是

一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,

存储和转发消息数据。

RabbitMQ 的三大特点

流量消峰:

该场景一般在秒杀或者团购活动中使用广泛。

使用 MQ 后,用户的大量请求不在直接访问数据库,而是将大量请求积压在 MQ 消息队列中,数据库从 MQ 中拉取能处理的请求,避免了数据库因为大量请求出现崩溃、宕机等情况

在这里插入图片描述

应用解耦:

传统做法订单系统直接调用其他接口,如果有一个接口出现问题,整个订单系统无法正常运转的。

使用 MQ 后,将 MQ 作为中间件与其他接口相连,即使有一个接口出现问题,其他还是正常运转的。

image-20230131110145608

异步处理:

场景说明:用户注册后,需要发送注册邮件和注册短信,传统的做法:1、串行方式 2、并行方式 3、MQ 消息队列

1、一套流程全部完成后,返回客户端

image-20230131110207897

2、发送邮件的同时发送短信,节省了一定的时间

image-20230131110218038

3、使用 MQ

image-20230131110232682

2、RabbitMQ 的相关概念

image-20230131111138505

Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Producer/publisher :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue

Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,根据message中的routing-key决定转发到哪个 Queue 中

Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

Binding: 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。

Connection: 网络连接,比如一个TCP连接。

Channel: 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

-----发送消息-----

1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange根据routing-key将消息转发到指定的Queue(队列)

----接收消息-----

1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。

3、安装 RabbitMQ

1、拉取镜像并运行实例

docker run -d --name rabbitmq --restart=always -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
  • 1

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

4、交换机类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型

(1)direct - 直连交换机

直连交换机通过指定的 routing-key 连接指定的队列,是一种完全匹配的方式。

image-20230131123907129

(2)fanout - 扇出交换机

fanout 交换机会将消息转发到所有与它绑定的队列上,无论是否指定了routing-key。是一种 广播的模式,

并且 fanout 交换机时散发消息最快的,因为无需判断 routing-key

image-20230131124218722

(3)topic - 主题交换机

topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行 匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。

它同样也 会识别两个通配符:符号“#”和符号 “*”。

# 匹配0个或多个单词, * 匹配一 个单词。

image-20230131124358574

5、RabbitMQ 管理界面

IP:15762

进入到管理界面

默认用户: guest

默认密码:guest

image-20230131121149234

1、创建交换机

image-20230131121534060

2、创建队列

image-20230131122010350

3、交换机绑定队列

image-20230131122127912

image-20230131122231931

如果使用 Topic 交换机,可以在绑定队列时,指明routing-key 使用通配符的方式:

image-20230131123443103

4、发送消息

image-20230131122451330

查看队列接受的消息

image-20230131122630272

image-20230131122740495

6、SpringBoot 整合RabbitMQ

1、引入依赖

RabbitAutoConfiguration 生效

引入了 CachingConnectionFactory、RabbitTemplate、AmqpAdmin、RabbitMessagingTemplate

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4

2、RabbitMQ 的所有配置都在 RabbitProperties 。 进行配置

spring:
    rabbitmq:
        host: 192.168.56.111
        port: 5672
        virtual-host: /
  • 1
  • 2
  • 3
  • 4
  • 5

3、开启RabbitMQ

@EnableRabbit
  • 1

7、AmqpAdmin 的使用

使用Java代码创建 Exchange、Queueu、Binding:

  • AmqpAdmin
@SpringBootTest
@Slf4j
@RunWith(SpringRunner.class)
public class GulimallOrderApplicationTest {

    @Autowired
    private AmqpAdmin amqpAdmin;

    /*
    * 创建一个交换机
    *   public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    *   name 交换机名字
    *   durable 是否持久化
    *   autoDelete 是否自动删除
    *   arguments其他的一些参数
    * */
    @Test
    public void createExchange() {
        Exchange exchange = new DirectExchange("hello-java-exchange",true,false);
        // 声明一个交换机
        amqpAdmin.declareExchange(exchange);
        log.info("交换机创建成功:{}","hello-java-exchange");
    }

    /*
    * 创建一个队列
    *   public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    *   name:队列名
    *   durable:是否持久化
    *   exclusive:是否是排他的
    *   autoDelete:是否自动删除
    *   arguments:其他的一些参数
    * */
    @Test
    public void createQueue() {
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("队列创建成功:{}","hello-java-queue");

    }

    /*
    * 创建绑定关系
    *   	public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
    *       Map<String, Object> arguments)
    *
    *       destination:绑定目标,绑定的队列名
    *       destinationType:绑定类型,QUEEN or EXCHANGE
    *       exchange: 绑定的交换机名
    *       routingKey:路由键
    *       arguments 其他参数
    * */
    @Test
    public void createBinding() {
        Binding binding = new Binding(
                "hello-java-queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",
                null);
        amqpAdmin.declareBinding(binding);
        log.info("绑定成功{},{}","hello-java-exchange","hello-java-queue");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

8、使用 RabbitTemplate 发送消息

    /*
    * 发送消息
    * */
    @Test
    public void senMessage() {
        String msg = "hello,world";
        // public void convertAndSend(String exchange, String routingKey, final Object object)
        // 交换机名称、路由键、消息
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
        log.info("发送消息成功:{}",msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

发送实体类:

    /*
    * 发送消息
    * */
    @Test
    public void senMessage() {
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setId(2L);
        orderEntity.setCreateTime(new Date());

        // String msg = "hello,world";
        // public void convertAndSend(String exchange, String routingKey, final Object object)
        // 交换机名称、路由键、消息
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
        log.info("发送消息成功:{}",orderEntity);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在发送实体类时,默认是使用 jdk 的序列化机制,并且要求实体类实现 Serializable 接口。

image-20230131171006876

可以自定义消息转换器,使用不同的序列化方式,在 RabbitTemplate 中默认使用的是 SimpleMessageConverter 消息转换器。

image-20230131171305563

在 SimpleMessageConverter 中可以看见,如果是 string类型的消息,直接转化为 byte 流发送

如果实现了 Serializable 接口,就按照 jdk 的方式序列化

image-20230131171348684

可自定义的消息转换器,我们使用 Jackson2JsonMessageConverter

image-20230131171517553

创建配置类,自定义消息转换器:

@Configuration
public class MyRabbitConfig {

    /*
    * 自定义消息转换器
    * */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

重新发送消息:已经序列化为Json

image-20230131171858940

9、使用 @RabbitListener&@RabbitHandler 接收消息

使用 @RabbitListener 接受消息:


    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *   (1) 在多服务下,一条消息只能有一个客户端接收
    *   (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *   @RabbitHandler:标注在方法上
    *   @RabbitListener: 标注在类、方法上
    * */
    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        byte[] body = message.getBody();
        MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

使用 @RabbitHandler + @RabbitListener 接收不同类型的消息:

@RabbitHandler 标注在方法上

@RabbitListener 标注在方法、类上

如果我们发送消息的类型不是一种类型,单独使用 @RabbitListener 还需要获取 body 的数据然后判断类型,非常麻烦,这时就可以组合使用 @RabbitHandler + @RabbitListener 接收不同的消息:

例子

消息发送者:

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMQ/{num}")
    public String sendMQ(@PathVariable("num") Integer num) {
        for (Integer i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();

                orderEntity.setId(i.longValue());
                orderEntity.setCreateTime(new Date());

                // String msg = "hello,world";
                // public void convertAndSend(String exchange, String routingKey, final Object object)
                // 交换机名称、路由键、消息
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
                log.info("发送消息成功:{} 第", + i + "条orderEntity消息..");
            }else {
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(i.longValue());
                orderReturnApplyEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity);
                log.info("发送消息成功:{} 第", + i + "条orderReturnApplyEntity消息..");
            }
        }

        return  "sendMQ OK";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

消息接收者:

@RabbitListener 标注在类上,指明接受哪个队列的消息,使用 @RabbitHandler 标注在不同的方法上,一个方法接收一种类型的数据

@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *   (1) 在多服务下,一条消息只能有一个客户端接收
    *   (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *   @RabbitHandler:标注在方法上
    *   @RabbitListener: 标注在类、方法上
    * */
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        byte[] body = message.getBody();
        MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }

    @RabbitHandler
    public void receiveOrderReturnApplyEntityMessage(Message message, OrderReturnApplyEntity entity,Channel channel) {
        // System.out.println("接收到的消息: " + message);
        System.out.println("接收到的消息体:" + entity);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

结果:

image-20230131181606902

10、发布确认

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。因此保证 RabbitMQ 消息的可靠投递,需要采取一些措施。

可以使用事务消息,但是性能下降250倍,为此引入确认 机制

image-20230131183805211

根据RabbitMQ消息的投递流程,可将确认机制分为俩部分:

第一部分:消息生产者的确认回调

  • publisher:confirmCallback 确认模式
  • publisher:returnCallback 未投递到 queue 退回模式

第二部分:消息消费者的确认

  • consumer:ack机制

image-20230131183840456

(1)生产端确认

消息生产者:

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMQ/{num}")
    public String sendMQ(@PathVariable("num") Integer num) {
        for (Integer i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();

                orderEntity.setId(i.longValue());
                orderEntity.setCreateTime(new Date());
                /*
                * 	public void convertAndSend(String exchange, String routingKey, final Object object,@Nullable CorrelationData correlationData)
                *   exchange: 交换机名称
                *   routingKey 路由键
                *   object 消息体
                *   CorrelationData  消息 id
                * */
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
                // log.info("发送消息成功:{}",i);
            }else {
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(i.longValue());
                orderReturnApplyEntity.setCreateTime(new Date());
                
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java11",orderReturnApplyEntity,new CorrelationData(UUID.randomUUID().toString()));
                // log.info("发送消息成功:{}",i);

            }
        }

        return  "sendMQ OK";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

配置 ConfirmCallback、ReturnCallback 回调

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
     * 自定义消息转换器
     * */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /*
     * 设置发布确认机制
     *   1、ConfirmCallback,只要生产者发送消息就会执行此回调。
     *      spring.rabbitmq.publisher-confirms=true
     *   2、ReturnCallback 只有交换机将消息转发到Queue失败时,才会调用此回调
     *      # 开启发送端确认机制。 Exchange --> Queue
     *      spring.rabbitmq.publisher-returns=true
     *      # 只要消息成功发送到Queue,就优先异步调用 ReturnCallback
     *      spring.rabbitmq.template.mandatory=true
     * */
    @PostConstruct // MyRabbitConfig初始化之后执行
    public void InitRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @description
             * @date 2023/1/31 18:55
             * @param correlationData 保存消息的id以及相关信息,可在发送消息时指定 new CorrelationData()
             * @param ack 消息是否发送成功。true:Broke接收到消息, false:Broker没有接收到消息
             * @param cause 消息发送失败的原因
             * @return void
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("Broker接收消息成功, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause);
                } else {
                    System.out.println("Broker接收消息失败, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @description
             * @date 2023/1/31 22:25
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本
             * @param exchange  投递失败的交换机
             * @param routingKey    投递失败消息的 routing-key
             * @return void
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("message: " + message + " replyCode: " + replyCode + " replyText: " + replyText + " exchange: " + exchange + " routingKey: " + routingKey);
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

生产端确认的配置:

# 开启发送端确认机制。 生产者 --> Broker
spring.rabbitmq.publisher-confirms=true
# 开启发送端确认机制。 Exchange --> Queue
spring.rabbitmq.publisher-returns=true
# 只要消息成功发送到Queue,就优先异步调用 ReturnCallback
spring.rabbitmq.template.mandatory=true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(2)消费端确认

消费端消费一个消息默认是自动确认的,当消费者启动时,队列中数据会全部转发给消费者处理,并自动进行消息确认,在队列中删除消息。但是当消费者处理完一条消息后,突然宕机,就会造成其他消息的丢失。

因此在消费者接收消息时应该使用手动确认模式,只要消息没有手动进行 Ack,消息就一直是 unChecked,即使宕机也不会丢失,会重新进入到 Ready 状态。

开启手动确认

# 设置手动Ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2

消费端手动确认方法: channel.basicAck()

消费端手动拒绝方法: channel.basicReject()/ channel.basicNack()

    /*
    * RabbitMQ 接收消息
    *   1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息
    *       queueu 是一个 String[], 可指定接受多个队列的消息
    *   参数可接收的类型:
    *       rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体
    *       T : 可接收发送消息的类型
    *       Channel channel : 信道信息
    *       (1) 在多服务下,一条消息只能有一个客户端接收
    *       (2) 处理完一条消息后,才能接收下一条消息
    *
    *   2、使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
    *        @RabbitHandler:标注在方法上
    *        @RabbitListener: 标注在类、方法上
    *      
    * */
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) {

        // byte[] body = message.getBody();
        // MessageProperties header = message.getMessageProperties();
        // System.out.println("接收到的消息: " + message);
        // System.out.println("接收到的消息体:" + entity);

        /*
        *  消息确认
        *       void basicAck(long deliveryTag, boolean multiple) throws IOException;
        *           deliveryTag: 消息标签,channel内顺序自增
        *           multiple 是否批量确认
        *   拒绝消息
        *       void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        *            multiple 是否批量拒绝消息
        *            requeue 拒绝的消息是否重新入队。如果重新入队还重新发送给消费者
        *       void basicReject(long deliveryTag, boolean requeue) throws IOException;
        *           与 basicNack 区别就是没有批量拒绝消息
        *
        * */
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if (deliveryTag % 2 == 0) {
                // 手动确认消息
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..." + deliveryTag);
            }else {
                // 拒绝消息
                channel.basicNack(deliveryTag,false,true);
                // channel.basicReject();
                System.out.println("没有签收货物..." + deliveryTag);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

11、延时队列

延时队列使用场景

订单超过指定时间未支付,解锁库存。

image-20230204160807623

一些概念说明

消息TTL【Time To Live】:消息存活时间。

RabbitMQ 可以为队列消息 分别设置 TTL。 对队列设置 TTL,就是对队列中的所有消息设置 TTL

死信【Dead Letter】 : 顾名思义就是死掉的消息,没有消费者接收的消息。

死信的来源

  • 消息超过指定 TTL 没有被消费者接收
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
  • 消息被拒绝(basic.reject 或 basic.nack)并且 不允许重新放到队列中【requeue=false】

死信交换机【Dead Letter Exchange】 : 死信交换机和普通交换机的创建一样,唯一的区别就是死信交换机专门处理 死信 , 与死信交换机绑定的队列中如果有死信,就会被转发到死信交换机中做下一步处理。

因此,我们可以使用 TTL + 死信交换机就可以实现一个延迟队列。

延迟队列的实现方式一:为队列设置 TTL

生产者发送的消息到达设置TTL的队列后,如果在这个指定时间内没有消费者接收,那么该消息就变为 死信,同时转发给死信交换机,由死信交换机转发到特定的队列中再次进行消费。

image-20230204163230766

延迟队列实现方式二:为消息设置TTL

为生产者发送的每一条消息都设置TTL,不推荐这种方法

因为 RabbitMQ 使用的惰性机制对消息进行检查,如果第一条消息的 TTL = 5min,第二条消息的 TTL = 30s。

第三条消息的 TTL = 1s ,RabbitMQ检查第一条消息一看 5 分钟过期,就会5分钟后来检查,那么第二、第三条消息都会在 5min 后 转发给死信交换机。

image-20230204163509409

实战

模拟 订单超时关闭的场景

由生产者P 向 order-event- exchange 交换机发送订单消息,路由键为 order.create.order。

交换机与俩个队列绑定:

第一个:order.delay.queue 为延迟队列,通过order.create.order 路由键绑定,设置三个参数,死信交换机、死信路由键、TTL

第二个:order.release.order.queue 普通队列,通过 order.release.order 路由键绑定。

在页面下单之后,随之生产者会向交换机发送一条订单创建消息,路由键为 order.create.order , 交换机会将此消息发送到 延迟队列,等到达指定的 TTL 之后,说明订单超时未支付,将消息转发到绑定的 死信交换机 中,交换机在通过 order.release.order queue 队列转发给消费者 C

image-20230204170303854

消费者:创建订单完成后,向RabbitMQ 发送消息

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 模拟生成订单,向MQ发送消息
    @RequestMapping("/sendMQ/createOrder")
    public String createOrder() {
        // 创建订单
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setOrderSn(UUID.randomUUID().toString());

        // 向MQ发送消息,监听订单是否支付成功
        // String exchange, String routingKey, Object message,CorrelationData correlationData
        // 交换机、消息的路由键,发送的消息,消息的唯一标识
        rabbitTemplate.convertAndSend("order-event- exchange","order.create.order",orderEntity);

        return "Order created !!";
    }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

创建交换机、队列,绑定关系,消费者:

@Configuration
public class MyMQConfig {

    // 消费者
    @RabbitListener(queues = "order.release.order.queue")
    public void consumer(OrderEntity order, Message message, Channel channel) throws IOException {
        System.out.println("订单超时未支付,即将关闭订单: " + order.getOrderSn());
        // 手动确认
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }

    /*
    * 使用 @Bean 的方式创建 Exchange、Queue、Binding...服务启动会自动向RabbitMQ创建。
    * 前提是RabbitMQ中没有这些  Exchange、Queue、Binding... 如果存在,即使配置不一样也不会重新创建。
    * */


    // 延迟队列
    @Bean
    public Queue orderDelayQueue() {
        // String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        HashMap<String, Object> arguments = new HashMap<>();
        // 设置与队列相连的死信交换机
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        // 转发死信的 路由键
        arguments.put("x-dead-letter-routing-key","order.release.order");
        // 设置队列的 TTL。超过30s就表示未支付订单,准备关闭
        arguments.put("x-message-ttl",3000);

        return new Queue("order.delay.queue",true,false,false,arguments);
    }

    // 普通队列
    @Bean
    public Queue orderReleaseOrderQueue() {
        return new Queue("order.release.order.queue",true,false,false,null);
    }

    // 交换机
    @Bean
    public TopicExchange orderEventExchange() {
        //String name, boolean durable, boolean autoDelete)
        return new TopicExchange("order-event-exchange",true,false);
    }

    // 设置绑定关系: order-event- exchange ——》order.delay.queue
    @Bean
    public Binding orderCreateOrder() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments
        // 绑定目的地-绑定的队列,绑定类型【交换机 OR 队列】,交换机,路由键,其他参数信息
        return new Binding(
                "order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event- exchange",
                "order.create.order",
                null);
    }

    // 设置绑定关系: order-event- exchange ——》order.release.order.queue
    @Bean
    public Binding orderReleaseOrder() {
        return new Binding(
                "order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

启动服务,已成功创建出交换机、队列

image-20230204185117203

image-20230204185130939

创建四条订单,已成功监听到…

image-20230204185837554

12、消息丢失、积压、重复等方案

1、消息丢失

消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机 制,可记录到数据库,采用定期扫描重发的方式
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进 行重发

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机。

  • publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

  • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重 新入队

总体来说:

1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制

2、将消息的状态信息保存到数据库中,比如可以创建如下这张表

CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
 `content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
 `routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、消息重复

(1)消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者

(2)消息消费失败,由于重试机制,自动又将消息发送出

(3)成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

解决方案

消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志

使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理

rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的

3、消息积压

(1)消费者宕机积压

(2)消费者消费能力不足积压

(3)发送者发送流量太大

解决方案

  • 上线更多的消费者,进行正常消费
  • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
    及处理就noAck并重 新入队

总体来说:

1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制

2、将消息的状态信息保存到数据库中,比如可以创建如下这张表

CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
 `content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
 `routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/513227
推荐阅读
相关标签
  

闽ICP备14008679号