赞
踩
1)、下载插件
下载地址:https://www.rabbitmq.com/community-plugins.html
选择相应的版本点击下载
下载的是.zip的安装包,下载完之后需要手动解压并上传到Linux服务器中
2)、安装插件
拷贝插件到Docker:
[root@localhost plugins]# ls
rabbitmq_delayed_message_exchange-20171215-3.6.x.ez
[root@localhost plugins]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
bc514a2c15c1 rabbitmq:3.6.15-management "docker-entrypoint.s…" 29 hours ago Up 29 hours 4369/tcp, 5671/tcp, 0.0.0
.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq
[root@localhost plugins]# docker cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez rabbitmq:/plugins
3)、启动插件
进入docker内部:
docker exec -it rabbitmq bash
开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查询安装的所有插件:
rabbitmq-plugins list
重启RabbitMQ,使插件生效:
docker restart rabbitmq
4)、代码实现
配置类:
@Configuration public class DelayedConfig { public static final String QUEUE_NAME = "delayed.queue"; public static final String EXCHANGE_NAME = "delayedExchange"; @Bean public Queue queue() { return new Queue(DelayedConfig.QUEUE_NAME); } //配置默认的交换机 @Bean CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //参数二为类型:必须是x-delayed-message return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args); } //绑定队列到交换器 @Bean Binding binding(Queue queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); } }
生产端:
@Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("发送时间:" + sf.format(new Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", 3000); return message; } }); } }
消费端:
@Component
@RabbitListener(queues = DelayedConfig.QUEUE_NAME)
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
测试类:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDelayedMessageApplicationTests {
@Autowired
private DelayedSender sender;
@Test
public void send() throws InterruptedException {
sender.send("first delayed-message");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
1)、什么是生产端的可靠性投递?
2)、生产端——可靠性投递
方案一:消息持久化到数据库,对消息状态进行打标
1)、进行业务数据和消息记录的入库,消息的状态为未发送
2)、发送消息
3)、broker confirm,生产端的Confirm Listener负责监听
4)、收到broker confirm后,更新数据库中消息记录的状态为已发送
5)、采用分布式定时任务抓取消息记录的数据
6)、如果消息的状态是未发送,重新发送消息
7)、如果重试发送的次数超过临界值,将消息状态更新为投递失败,通过补偿系统查询最终失败的消息
缺陷:在高并发场景下,对消息进行记录会有频繁的数据库持久化操作,数据库的压力过大
方案二:消息的延迟投递,做二次确认,回调检查
1)、进行业务数据入库,之后发送第一条消息
2)、发送第二条消息,延迟消息投递检查
3)、消费端监听指定队列收到第一条消息进行处理
4)、消费端处理完,发送第一条消息处理完的confirm消息
5)、Callback服务监听指定队列收到confirm的消息,将消息的记录入库
6)、Callback服务监听指定队列收到延迟消息投递检查,检查数据库中第一条消息是否已经处理完成,如果第一条消息处理完的confirm消息没有收到,Callback服务向上游服务发送RPC请求,让上游服务重新发送消息
方案一:唯一ID+指纹码机制,利用数据库主键去重
SELECT COUNT(1) FROM T_ORDER WHERE ID=唯一ID+指纹码
优点:实现简单
缺陷:高并发下有数据库写入的性能瓶颈
解决方案:根据ID进行分库分表进行算法路由
本地ID生成服务为统一ID生成服务的兜底策略
方案二:利用Redis的原子性实现
批量消息是指我们把消息放到一个集合里统一进行提交,这种方案设计思路是期望消息在一个会话里,比如投掷到threadlocal里的集合,然后拥有相同会话ID,并且带有这次提交消息的SIZE等相关属性,最重要的一点是要把这一批消息进行合并。对于Channel而言,就是发送一次消息。这种方式也是希望消费端在消费的时候,可以进行批量化的消费,针对于某一个原子业务的操作去处理,但是不保障可靠性,需要进行补偿机制
1)、进行业务数据入库
2)、相同SessionId的消息进行批量处理,存储在ThreadLocal中,在ThreadLocal中MessageHoder负责装消息,消息记录的入库,只记录SessionId
其他操作同确认模式
类似于批量消息的实现机制
需要保障以下几点:
生产端:
1)、进行业务数据入库
2)、相同SessionId的消息组成一批带有顺序的消息(不做批量处理,消息是多条的),可以做消息入库保证消息的可靠性投递
其他操作同确认模式
消费端:
1)、进行消息记录的入库
2)、发送给自身的延迟消息只包含会话ID、SIZE
3)、收到延迟投递的消息,从数据库中根据会话ID、SIZE查找到对应的消息
4)、执行实际的业务逻辑处理
5)、定时任务进行补偿
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。