当前位置:   article > 正文

RabbitMQ的高级特性:如何解决RabbitMQ使用的问题_channel.addconfirmlistener不生效

channel.addconfirmlistener不生效

一.RabbitMQ的问题分析

1.消息发出后,发送端不知道RabbitMQ是否真的接受到了消息?
  • 消息发送后,发送端不知道RabbitMQ是否真的收到了消息
  • 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
  • 需要使用RabbitMQ发送端确认机制,确认消息是否发送
2. 消息发送后,消息真的被路由了吗?
  • 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
  • 消息丢弃后,订单处理流程停止,业务异常
  • 需要使用RabbitMQ消息返回机制,确认消息被正确路由
3.消息发送后,消息量过大,消费端处理的过来吗?
  • 业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务服务崩溃
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定
4.消费端处理异常怎么办?
  • 默认情况下,消费端接收消息时,消息会被自动确认(ACK)
  • 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
  • 需要使用RabbitMQ消费端确认机制,确认消息被正确处理
5.队列爆满怎么办?
  • 默认情况下,消息进入队列,会永远存在,直到被消费
  • 大量堆积的消息会给RabbitMQ产生很大的压力
  • 需要使用RabbitMQ消息过期时间,防止消息大量积压
6.如何转移过期消息
  • 消息被设置了过期时间,过期后会直接被丢弃
  • 直接被丢弃的消息,无法对系统运行异常发出警报
  • 需要使用RabbitMQ死信队列,收集过期消息吗,以供分析
7.总结
  • 发送端确认机制
  • 消息返回机制
  • 消费端限流机制
  • 消费端确认机制
  • 消息过期机制
  • 死信队列

二.RabbitMQ的高级特性

1.发送端确认机制原理

1.1 问题分析:消息真的发出去了吗?

  • 消息发送后,发送端不知道RabbitMQ是否真的收到了消息

  • 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常

  • 需要使用RabbitMQ发送端确认机制,确认消息发送

1.2 什么是发送端确认机制

  • 消息发送后,若中间件收到消息,会给发送端一个应答
  • 生产者(发送端)接收应答,用来确认这条消息是否正常发送到中间件

1.3 三种确认机制

  • 单条同步确认
  • 多条同步确认
  • 异步确认

1.4 单条同步确认机制的实现方法

  • 配置channel,开启确认模式:channel.confirmSelect()

  • 每发送一条消息,调用channel.waitForConfirms()方法,返回布尔值,等待确认

 ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
            //开启确认模式
            channel.confirmSelect();
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            if (channel.waitForConfirms()) {
                log.info("RabbitMQ confirm OK");
            } else {
                log.info("RabbitMQ confirm Failed");
            }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

1.5 多条同步确认机制的实现方法:不太推荐

  • 配置channel,开启确认模式:channel.confirmSelect()
  • 发送多条消息后,调用channel.waitForConfirms()方法,返回布尔值,等待确认
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
             channel.confirmSelect();
            for (int i = 0; i < 10; i++) {
                channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
                log.info("message sent");
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注意:调用channel.waitForConfirms()方法,返回布尔值,如果为true时,则前面发送的多条消息发送成功,如果为false时,则前面发送的多条消息部分成功,部分不成功,而不是所有的消息都不成功。

1.6 异步确认机制的实现方法:不太推荐,并发时存在线程安全问题

  • 配置channel,开启确认模式:channel.confirmSelect()
  • 在channel上添加监听:addConfirmListener,发送消息后,会回调此方法,通知是否发送成功
  • 异步确认有可能是单条,也可能是多条,取决于MQ
  ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            
          
            //3.异步同步实现机制
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                //成功后调用handleAck
                //deliveryTag 发送端的消息序号  就是发送了消息的条数
                //multiple是否多条
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("Ack, deliveryTag: {}, multiple: {}",  deliveryTag, multiple);
                }
                //失败后调用handleNack
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("Nack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
                }
            });
          String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
          channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

确认机制的流程图:

rabbitmq02
2.消息返回机制

2.1 问题分析:消息发送后,消息真的被路由了吗?

  • 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
  • 消息丢弃后,订单处理流程停止,业务异常
  • 需要使用RabbitMQ消息返回机制,确认消息被正确路由
rabbitmq03

2.2 消息返回机制的原理是什么? 

  • 消息发送后,中间件会对消息进行路由
  • 若没有发现目标队列,中间件会通知发送方
  • Return Listener会被调用

2.3 消息返回机制的开启方法 

  • 在RabbitMQ基础配置中有一个关键配置项:Mandatory

  • Mandatory为false,RabbitMQ讲直接丢弃无法路由的消息

  • Mandatory为true,RabbitMQ才会处理无法路由的消息,handleReturn方法会被调用

    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

  • 1
  • 2
  • 3
//开启消息返回机制  
channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());

  • 1
  • 2
  • 3
 ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.addReturnListener(new ReturnListener() {
                    //replyCode 状态信息
                    //replyText 回复的信息
                    //exchange 交换机
                    //routingKey  路由键
                    //AMQP.BasicProperties properties 消息的原数据
                    //body 消息的内容
                    @Override
                    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        log.info("Message Return: " +
                                "replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, properties:{}, body:{}",
                                replyCode, replyText, exchange, routingKey, properties, new String(body));
                        //除了打印log,可以加别的业务操作
                    }
                });
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());

            }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
 ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.addReturnListener(new ReturnCallback() {
                    @Override
                    public void handle(Return returnMessage) {
                        log.info("Message Return: returnMessage{}", returnMessage);

                        //除了打印log,可以加别的业务操作
                    }
                });
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

注意:channel.addReturnListener(new ReturnListener() )channel.addReturnListener(new ReturnCallback())的方法其实本质一致,但是前一种的入参较多,而后一种可以通过returnMessage.getXXX获取前一种方法的参数

3.消费端确认机制

3.1 问题分析:消息发送后,消费端处理异常怎么办?

  • 默认情况下,消费端接收消息时,消息会被自动确认(ACK)
  • 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
  • 需要使用RabbitMQ消费端确认机制,确认消息被正确处理

3.2 消费端ACK类型

  • 自动ACK:消费端收到消息后,会自动签收消息
  • 手动ACK:消费端收到消息后,不回自动签收消息,需要我们在业务代码中显式签收消息

3.3 手动ACK类型

  • 单条手动ACKmultiple=false

  • 多条手动ACKmultiple=true

  • 推荐使用单条ACK

3.4 重回队列

  • 若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理
  • 一般不建议开启重回队列,因为第一次处理的异常的消息,再次处理,基本上也是异常

1.修改channel.basicConsume中的autoAckfalse

channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
            });
  • 1
  • 2

2.签收消息

//1.签收条数 是否多条
void basicAck(long deliveryTag, boolean multiple) throws IOException;

  • 1
  • 2
  • 3
DeliverCallback deliverCallback = (consumerTag, message) -> { 
    //对消息进行手动签收
    if (message.getEnvelope().getDeliveryTag()%10 == 0){
      channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.手动拒收,重回队列:不太推荐

    @Bean
    Channel rabbitChannel() throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
//单条
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
//多条条
if (message.getEnvelope().getDeliveryTag()%10 == 0){
  channel.basicNack(message.getEnvelope().getDeliveryTag(),true,true);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
4.消费端限流机制

4.1 问题分析:消息发送后,消息量过大,消费端处理的过来吗?

  • 业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务服务崩溃
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

4.2 RabbitMQ-Qos服务质量保证

  • 针对以上问题,RabbitMQ开发Qos(服务质量保证)功能
  • Qos功能保证了在一定数目的消息未被确认前,不消费新的消息
  • Qos的前提是不使用自动确认AutoACK=false

4.3 Qos原理

  • Qos原理是当消费端有一定的数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息
  • RabbitMQ使用Qos机制实现了消费端限流

4.4 消费端限流机制参数设置

  • prefetchCount:针对一个消费端最多推送多少未确认消息

  • gloal

    • true:针对整个消费端限流
    • false:针对当前的channel限流
  • prefetchSize:0:单个消息大小限制,一般为0

  • prefetchSize与gloal两项,RabbitMQ暂时未实现

 channel.basicQos(5);
  • 1
 @Async
    public void handleMessage() throws IOException, TimeoutException, InterruptedException {
        log.info("start linstening message");
        channel.exchangeDeclare(
                "exchange.order.restaurant",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);

        channel.queueDeclare(
                "queue.restaurant",
                true,
                false,
                false,
                null);

        channel.queueBind(
                "queue.restaurant",
                "exchange.order.restaurant",
                "key.restaurant");

        channel.basicQos(5);
        channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
5.消费端过期机制:不单独使用,与死信机制联合使用

5.1问题分析:队列爆满怎么办?

  • 默认情况下,消息进入队列,会永远存在,直到被消费

  • 大量堆积的消息会给RabbitMQ产生很大的压力

  • 需要使用RabbitMQ消息过期时间,防止消息大量积压

    5.2RabbitMQ的过期时间(TTL)

  • RabbitMQ的过期时间称为TTL(Time to Live),生存时间

  • RabbitMQ的过期时间分为消息TTL队列TTL

  • 消息TTL设置了单条消息的过期时间

  • 队列TTL设置了队列中所有的消息过期时间

    5.3如何找到适合自己的TTL

  • TTL设置主要考虑技术架构与业务

  • TTL应该明显长于服务的平均重启时间

  • 建议TTL长于业务高峰期时间

注意:TTL这里不推荐单独使用,会导致误删消息,后面会与死信一起使用

1.设置单条消息的过期时间

    //1.单条同步确认机制 实现
        channel.confirmSelect();
        //设置过期时间
        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("15000").build();
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
        log.info("message sent");
        if (channel.waitForConfirms()) {
            log.info("RabbitMQ confirm OK");
        } else {
            log.info("RabbitMQ confirm Failed");
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.设置队列的过期时间:如果队列已经存在,添加了队列的过期时间,需要删除之前的队列,重启服务

@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
    log.info("start linstening message");
    channel.exchangeDeclare(
            "exchange.order.restaurant",
            BuiltinExchangeType.DIRECT,
            true,
            false,
            null);
    //设置队列的过期时间
    Map<String, Object> args=new HashMap<>(16);
    args.put("x-message-ttl",15000);
    channel.queueDeclare(
            "queue.restaurant",
            true,
            false,
            false,
            args);

    channel.queueBind(
            "queue.restaurant",
            "exchange.order.restaurant",
            "key.restaurant");

    channel.basicQos(5);
    channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
    });
    while (true) {
        Thread.sleep(100000);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
        //设置队列的过期时间
        Map<String, Object> args=new HashMap<>(16);
        //设置队列中消息的过期时间
        args.put("x-message-ttl",15000);
        //设置队列的过期时间 不太推荐使用
        args.put("x-expire",15000);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
6.死信队列:收集过期消息

6.1 问题分析:如何转移过期消息

  • 消息被设置了过期时间,过期后会直接被丢弃
  • 直接被丢弃的消息,无法对系统运行异常发出警报
  • 需要使用RabbitMQ死信队列,收集过期消息吗,以供分析

6.2 什么是死信队列 

  • 死信队列:队列被配置了DLX属性(Dead-Letter- Exchange)

  • 当一个消息变成死信(dead message)后,能重新北发不到另一个Exchange,这个Exchange也是一个普通的交换机

  • 死信被死信交换机路由后,一般进入一个固定的队列

rabbitmq04

描述:当生产者发送信息,被Exchange交换机路由到Queue队列中,但是这个消息在队列中放置的时间过长,超过了设置的TTL之后,就会变成死信,死信会被重新转发到一个DL Exchange中,DL Exchange也是一个普通的Exchange,只是我们给他起了一个DL标识而已,DL Exchange会把消息路由到DL Queue,这个Queue就会被人工的监听或者专门处理异常的程序所监听。这个过程就叫做死信的转移。

6.3 怎么变成死信呢? 

  • 消息被拒绝(reject/nack)并且requeue=false

    //手动拒收
    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
    
    • 1
    • 2
  • 消息过期(TTL到期)

    //设置队列中消息的过期时间
    args.put("x-message-ttl",15000);
    
    • 1
    • 2
  • 队列达到最大长度

    //设置队列的最大长度
    args.put("x-max-length",5); 
    
    • 1
    • 2

6.4 死信队列设置方法 

  • 设置转发、接收死信的交换机和队列

    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 在需要设置死信的队列加入参数:

    • x-dead-letter-exchange=dlx.exchange
 @Async
    public void handleMessage() throws IOException, TimeoutException, InterruptedException {
        log.info("start linstening message");
        //声明死信交换机
        channel.exchangeDeclare("exchange.dlx",
                BuiltinExchangeType.TOPIC,
                true,
                false,
                null);
        //声明死信队列
        channel.queueDeclare("queue.dlx",
                true,
                false,
                false,
                null);
        //绑定exchange和queue
        channel.queueBind("queue.dlx","exchange.dlx","#");
        channel.exchangeDeclare(
                "exchange.order.restaurant",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);
        //设置队列的过期时间
        Map<String, Object> args=new HashMap<>(16);
        //设置队列中消息的过期时间
        args.put("x-message-ttl",15000);
        //设置队列的过期时间
//        args.put("x-expire",15000);
        //设置死信队列
        args.put("x-dead-letter-exchange","exchange.dlx");
        channel.queueDeclare(
                "queue.restaurant",
                true,
                false,
                false,
                args);

        channel.queueBind(
                "queue.restaurant",
                "exchange.order.restaurant",
                "key.restaurant");
				//设置消费端限流机制
        channel.basicQos(5);
        channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

声明死信交换机、队列、绑定exchange和queue

 //声明死信队列交换机
channel.exchangeDeclare("exchange.dlx",
                        BuiltinExchangeType.TOPIC,
                        true,
                        false,
                        null);
        //声明死信队列
        channel.queueDeclare("queue.dlx",
                true,
                false,
                false,
                null);
        //绑定exchange和queue
        channel.queueBind("queue.dlx","exchange.dlx","#");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号