赞
踩
什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。
不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现
private Queue creatQueue(String name){
// 创建一个 单活模式 队列
HashMap<String, Object> args=new HashMap<>();
args.put("x-single-active-consumer",true);
return new Queue(name,true,false,false,args);
}
创建之后,我们可以在控制台看到 消费者的激活状态
=======================>配置类 @Configuration @SuppressWarnings("all") public class DirectExchangeConfiguration { @Bean public Queue queue15_0() { return creatQueue(Message15.QUEUE_0); } @Bean public Queue queue15_1() { return creatQueue(Message15.QUEUE_1); } @Bean public Queue queue15_2() { return creatQueue(Message15.QUEUE_2); } @Bean public Queue queue15_3() { return creatQueue(Message15.QUEUE_3); } @Bean public DirectExchange exchange15() { // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它 return new DirectExchange(Message15.EXCHANGE, true, false); } @Bean public Binding binding15_0() { return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0"); } @Bean public Binding binding15_1() { return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1"); } @Bean public Binding binding15_2() { return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2"); } @Bean public Binding binding15_3() { return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3"); } /** * 创建一个 单活 模式的队列 * 注意 : * <p> * 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除 * * @param name * @return queue */ private Queue creatQueue(String name) { // 创建一个 单活模式 队列 HashMap<String, Object> args = new HashMap<>(); args.put("x-single-active-consumer", true); return new Queue(name, true, false, false, args); } =================================》生产者 @Component @Slf4j public class Producer15 { @Resource private RabbitTemplate rabbitTemplate; /** * 这里的发送是 拟投递到多个队列中 * * @param id 业务id * @param msg 业务信息 */ public void syncSend(int id, String msg) { Message15 message = new Message15(id, msg); rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message); } /** * 根据 id 取余来决定丢到那个队列中去 * * @param id id * @return routingKey */ private String getRoutingKey(int id) { return String.valueOf(id % Message15.QUEUE_COUNT); } } ============================》消费者 /** * 要想保证消息的顺序,每个队列只能有一个消费者 * * @author 深漂码农@明哥 * @date 2024-03-18 */ @Component @RabbitListener(queues = Message15.QUEUE_0) @RabbitListener(queues = Message15.QUEUE_1) @RabbitListener(queues = Message15.QUEUE_2) @RabbitListener(queues = Message15.QUEUE_3) @Slf4j public class Consumer15 { @RabbitHandler public void onMessage(Message15 message) throws InterruptedException { log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message); // 这里随机睡一会,模拟业务处理时候的耗时 long l = new Random(1000).nextLong(); TimeUnit.MILLISECONDS.sleep(l); } } ==============================》测试类 @Test void mock() throws InterruptedException { // 先启动这个测试类,模拟多个副本情况下,看如何消费 new CountDownLatch(1).await(); } @Test void syncSend() throws InterruptedException { // 模拟每个队列中扔 10 个数据,看看效果 for (int i = 0; i < 10; i++) { for (int j = 0; j < 4; j++) { producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息"); } } TimeUnit.SECONDS.sleep(20); } }
ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。