赞
踩
延迟队列可以通过设置生产者发送设置有ttl时间的信息给队列来实现,但这种方式中的消息可能不会按时成为死信,而且rabbitmq每次只能检测第一个消息是否过期,如果过期就丢入死信队列,第一个消息延迟时间很长,第二个消息延迟时间很短,第二个消息并不会先执行。
如:若先发一个ttl为40s的消息,然后发一个ttl为10s的消息,ttl为10s的消息并不会先被执行,而是要等ttl为40s的消息消费完后,再消费ttl为10s的消息。
延迟交换机:类型为:x-delayed-message,消息发送给延迟交换机,交换机不会立马发送给队列,而是存储在一个分布式数据系统表中,等到达到投递时间,便会发送给队列。
案例:生产者先发送ttl为40s的消息,然后发送ttl为10s的消息给延迟交换机,如下图:
1、安装延迟队列插件,找到对应rabbitmq安装目录plugins下,执行如下命令,并重启rabbitmq服务。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
出现x-delayed-message即安装成功
2、创建获取rabbitmq服务连接工具类,启动服务器,关闭linux上防火墙
- //连接工厂,创建信道工具类
- public class RabbitUtils {
- // 得到一个连接的 channel
- public static Channel getChannel() throws Exception {
- // 创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.23.129");
- factory.setUsername("user");
- factory.setPassword("123");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- return channel;
- }
- }
3、生产者发送延迟消息
(1)调用RabbitUtils工具类获取信道;
(2)调用exchangeDeclare(交换机名,类型(此处为x-delayed-message),是否持久化,是否自动删除,其他参数)声明延迟交换机,其他参数要设置x-delayed-type属性,设置改自定义交换机支持direct型;
(3)调用basicPublish(交换机名,路由,其他参数,消息体)方法发送消息,通过设置其他参数中的headers中x-delay属性,来设置延迟消息;
(4)获取发送消息后本地时间。
- public class Producer {
-
- //延迟交换机
- private final static String DELAY_EXCHANGE="delay_exchange";
-
- public static void main(String[] args) throws Exception {
- //获取信道
- Channel channel = RabbitUtils.getChannel();
-
- //声明延迟交换机
- Map<String,Object> argument=new HashMap<>();
- //设置改交换机为direct
- argument.put("x-delayed-type","direct");
- channel.exchangeDeclare(DELAY_EXCHANGE,"x-delayed-message",true,false,argument);
-
- //发送ttl为40秒的消息
- Map<String,Object> map=new HashMap<>();
- map.put("x-delay","40000");
- AMQP.BasicProperties properties=
- new AMQP.BasicProperties().
- builder().headers(map).build();
- String message="该消息的ttl时间为40秒";
- channel.basicPublish(DELAY_EXCHANGE,"delay",properties,message.getBytes("UTF-8"));
- System.out.println("第一条消息:"+message+",发送成功,当前时间为:"
- + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
-
- //发送ttl为10秒的消息
- map.put("x-delay","10000");
- AMQP.BasicProperties properties1=
- new AMQP.BasicProperties().
- builder().headers(map).build();
- String message1="该消息的ttl时间为10秒";
- channel.basicPublish(DELAY_EXCHANGE,"delay",properties1,message1.getBytes("UTF-8"));
- System.out.println("第二条消息:"+message1+",发送成功,当前时间为:"
- + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
- }
-
- }
4、消费者接收消息
(1)调用工具类获取信道,声明延迟队列,该队列不设置ttl时间;
(2)绑定延迟队列与延迟交换机;
(3)调用basicConsumer(队列名,是否自动应答,成功回调函数,失败回调函数)方法接收延迟消息,并获取本地时间。
- public class Consumer {
-
- //延迟交换机
- private final static String DELAY_EXCHANGE="delay_exchange";
-
- //延迟队列
- private final static String DELAY_QUEUE="delay_queue";
-
- public static void main(String[] args) throws Exception {
- //获取信道
- Channel channel = RabbitUtils.getChannel();
-
- //声明延迟队列
- channel.queueDeclare(DELAY_QUEUE,false,false,false,null);
-
- //绑定延迟队列与延迟交换机
- channel.queueBind(DELAY_QUEUE,DELAY_EXCHANGE,"delay");
-
- //接受消息成功回调函数
- DeliverCallback deliverCallback=(consumerTag,message)->{
- System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8")
- +",当前时间为:"+new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
- };
-
- //接收消息
- channel.basicConsume(DELAY_QUEUE,true,deliverCallback,consumerTag->{});
-
- }
-
- }
5、测试,生产者先发送ttl为40s的下洗,然后发送ttl为10s的消息,结果如下:
ttl为10s的先被消费者消费,测试成功。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。