当前位置:   article > 正文

RabbitMQ详解(六):RabbitMQ延迟队列插件、消息可靠性保障、消费端幂等性保障、批量消息、顺序消息_rabbitmq_delayed_message_exchange-20171215-3.6.x.e

rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

九、RabbitMQ扩展

1、RabbitMQ延迟队列插件

1)、下载插件

下载地址:https://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

选择相应的版本点击下载

下载的是.zip的安装包,下载完之后需要手动解压并上传到Linux服务器中

2)、安装插件

拷贝插件到Docker

[root@localhost plugins]# ls
rabbitmq_delayed_message_exchange-20171215-3.6.x.ez
[root@localhost plugins]# docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                    
                                                                    NAMES
bc514a2c15c1        rabbitmq:3.6.15-management   "docker-entrypoint.s…"   29 hours ago        Up 29 hours         4369/tcp, 5671/tcp, 0.0.0
.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq
[root@localhost plugins]# docker cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez rabbitmq:/plugins
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3)、启动插件

进入docker内部:

docker exec -it rabbitmq bash
  • 1

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1

查询安装的所有插件:

rabbitmq-plugins list
  • 1

重启RabbitMQ,使插件生效:

docker restart rabbitmq
  • 1

在这里插入图片描述

4)、代码实现

配置类:

@Configuration
public class DelayedConfig {
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String EXCHANGE_NAME = "delayedExchange";

    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    //配置默认的交换机
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    //绑定队列到交换器
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

生产端:

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送时间:" + sf.format(new Date()));
        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", 3000);
                return message;
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费端:

@Component
@RabbitListener(queues = DelayedConfig.QUEUE_NAME)
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收时间:" + sdf.format(new Date()));
        System.out.println("消息内容:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

测试类:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDelayedMessageApplicationTests {

    @Autowired
    private DelayedSender sender;

    @Test
    public void send() throws InterruptedException {
        sender.send("first delayed-message");
        Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、消息如何保障100%的投递成功?

1)、什么是生产端的可靠性投递?

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点确认应答
  • 完善的消息进行补偿机制

2)、生产端——可靠性投递

方案一:消息持久化到数据库,对消息状态进行打标

在这里插入图片描述
1)、进行业务数据和消息记录的入库,消息的状态为未发送

2)、发送消息

3)、broker confirm,生产端的Confirm Listener负责监听

4)、收到broker confirm后,更新数据库中消息记录的状态为已发送

5)、采用分布式定时任务抓取消息记录的数据

6)、如果消息的状态是未发送,重新发送消息

7)、如果重试发送的次数超过临界值,将消息状态更新为投递失败,通过补偿系统查询最终失败的消息

缺陷:在高并发场景下,对消息进行记录会有频繁的数据库持久化操作,数据库的压力过大

方案二:消息的延迟投递,做二次确认,回调检查

在这里插入图片描述

1)、进行业务数据入库,之后发送第一条消息

2)、发送第二条消息,延迟消息投递检查

3)、消费端监听指定队列收到第一条消息进行处理

4)、消费端处理完,发送第一条消息处理完的confirm消息

5)、Callback服务监听指定队列收到confirm的消息,将消息的记录入库

6)、Callback服务监听指定队列收到延迟消息投递检查,检查数据库中第一条消息是否已经处理完成,如果第一条消息处理完的confirm消息没有收到,Callback服务向上游服务发送RPC请求,让上游服务重新发送消息

3、如何保证消费端幂等性,避免消息的重复消费问题?

方案一:唯一ID+指纹码机制,利用数据库主键去重

SELECT COUNT(1) FROM T_ORDER WHERE ID=唯一ID+指纹码
  • 1

优点:实现简单

缺陷:高并发下有数据库写入的性能瓶颈

解决方案:根据ID进行分库分表进行算法路由

在这里插入图片描述

本地ID生成服务为统一ID生成服务的兜底策略

方案二:利用Redis的原子性实现

4、批量消息发送

批量消息是指我们把消息放到一个集合里统一进行提交,这种方案设计思路是期望消息在一个会话里,比如投掷到threadlocal里的集合,然后拥有相同会话ID,并且带有这次提交消息的SIZE等相关属性,最重要的一点是要把这一批消息进行合并。对于Channel而言,就是发送一次消息。这种方式也是希望消费端在消费的时候,可以进行批量化的消费,针对于某一个原子业务的操作去处理,但是不保障可靠性,需要进行补偿机制

在这里插入图片描述

1)、进行业务数据入库

2)、相同SessionId的消息进行批量处理,存储在ThreadLocal中,在ThreadLocal中MessageHoder负责装消息,消息记录的入库,只记录SessionId

其他操作同确认模式

5、顺序消息

类似于批量消息的实现机制

需要保障以下几点:

  • 发送的顺序消息,必须保障消息投递到同一个队列,且这个消费者只能有一个(独占模式)
  • 需要统一提交(可能是合并成一个大消息,也可能是拆分为多个消息),并且所有消息的会话ID一致
  • 添加消息属性:顺序标记的序号、和本次顺序消息的SIZE属性,进行落库操作
  • 并行进行发送给自身的延迟消息(带上关键属性:会话ID、SIZE)进行后续处理消费(使用延迟消息保证这一批消息都接收完毕)
  • 当收到延迟消息后,根据会话ID、SIZE抽取数据库数据进行处理
  • 定时轮询补偿机制,对于异常情况

在这里插入图片描述

生产端:

1)、进行业务数据入库

2)、相同SessionId的消息组成一批带有顺序的消息(不做批量处理,消息是多条的),可以做消息入库保证消息的可靠性投递

其他操作同确认模式

消费端:

1)、进行消息记录的入库

2)、发送给自身的延迟消息只包含会话ID、SIZE

3)、收到延迟投递的消息,从数据库中根据会话ID、SIZE查找到对应的消息

4)、执行实际的业务逻辑处理

5)、定时任务进行补偿

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/126978
推荐阅读
相关标签
  

闽ICP备14008679号