当前位置:   article > 正文

RabbitMQ之顺序消费_rabbitmq怎么保证顺序消费

rabbitmq怎么保证顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
  • 1

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){
        // 创建一个 单活模式 队列
        HashMap<String, Object> args=new HashMap<>();
        args.put("x-single-active-consumer",true);
        return new Queue(name,true,false,false,args);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
    @Bean
    public Queue queue15_0() {
        return creatQueue(Message15.QUEUE_0);
    }


    @Bean
    public Queue queue15_1() {
        return creatQueue(Message15.QUEUE_1);
    }

    @Bean
    public Queue queue15_2() {
        return creatQueue(Message15.QUEUE_2);
    }

    @Bean
    public Queue queue15_3() {
        return creatQueue(Message15.QUEUE_3);
    }

    @Bean
    public DirectExchange exchange15() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message15.EXCHANGE, true, false);
    }


    @Bean
    public Binding binding15_0() {
        return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
    }

    @Bean
    public Binding binding15_1() {
        return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
    }

    @Bean
    public Binding binding15_2() {
        return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
    }

    @Bean
    public Binding binding15_3() {
        return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
    }

    /**
     * 创建一个 单活 模式的队列
     * 注意 :
     * <p>
     * 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
     *
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        // 创建一个 单活模式 队列
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }
=================================》生产者
@Component
@Slf4j
public class Producer15 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 这里的发送是 拟投递到多个队列中
     *
     * @param id  业务id
     * @param msg 业务信息
     */
    public void syncSend(int id, String msg) {
        Message15 message = new Message15(id, msg);
        rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
    }

    /**
     * 根据 id 取余来决定丢到那个队列中去
     *
     * @param id id
     * @return routingKey
     */
    private String getRoutingKey(int id) {
        return String.valueOf(id % Message15.QUEUE_COUNT);
    }
}
============================》消费者
/**
 * 要想保证消息的顺序,每个队列只能有一个消费者
 *
 * @author 深漂码农@明哥
 * @date 2024-03-18
 */
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {

    @RabbitHandler
    public void onMessage(Message15 message) throws InterruptedException {
        log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 这里随机睡一会,模拟业务处理时候的耗时
        long l = new Random(1000).nextLong();
        TimeUnit.MILLISECONDS.sleep(l);
    }
}
==============================》测试类
@Test
    void mock() throws InterruptedException {
        // 先启动这个测试类,模拟多个副本情况下,看如何消费
        new CountDownLatch(1).await();
    }

    @Test
    void syncSend() throws InterruptedException {
        // 模拟每个队列中扔 10 个数据,看看效果
        for (int i = 0; i < 10; i++) {
            for (int j = 0; j < 4; j++) {
                producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");
            }
        }

        TimeUnit.SECONDS.sleep(20);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/621004
推荐阅读
相关标签
  

闽ICP备14008679号