当前位置:   article > 正文

RabbitMq——延迟队列(基于延迟交换机实现)_基于交换机的延迟队列

基于交换机的延迟队列

延迟队列可以通过设置生产者发送设置有ttl时间的信息给队列来实现,但这种方式中的消息可能不会按时成为死信,而且rabbitmq每次只能检测第一个消息是否过期,如果过期就丢入死信队列,第一个消息延迟时间很长,第二个消息延迟时间很短,第二个消息并不会先执行。

如:若先发一个ttl为40s的消息,然后发一个ttl为10s的消息,ttl为10s的消息并不会先被执行,而是要等ttl为40s的消息消费完后,再消费ttl为10s的消息。

延迟交换机:类型为:x-delayed-message,消息发送给延迟交换机,交换机不会立马发送给队列,而是存储在一个分布式数据系统表中,等到达到投递时间,便会发送给队列。

案例:生产者先发送ttl为40s的消息,然后发送ttl为10s的消息给延迟交换机,如下图:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCO56uv5bCP54y_,size_20,color_FFFFFF,t_70,g_se,x_16

1、安装延迟队列插件,找到对应rabbitmq安装目录plugins下,执行如下命令,并重启rabbitmq服务。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

出现x-delayed-message即安装成功

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCO56uv5bCP54y_,size_20,color_FFFFFF,t_70,g_se,x_16

 2、创建获取rabbitmq服务连接工具类,启动服务器,关闭linux上防火墙

  1. //连接工厂,创建信道工具类
  2. public class RabbitUtils {
  3. // 得到一个连接的 channel
  4. public static Channel getChannel() throws Exception {
  5. // 创建一个连接工厂
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("192.168.23.129");
  8. factory.setUsername("user");
  9. factory.setPassword("123");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. return channel;
  13. }
  14. }

3、生产者发送延迟消息

        (1)调用RabbitUtils工具类获取信道;

        (2)调用exchangeDeclare(交换机名,类型(此处为x-delayed-message),是否持久化,是否自动删除,其他参数)声明延迟交换机,其他参数要设置x-delayed-type属性,设置改自定义交换机支持direct型;

        (3)调用basicPublish(交换机名,路由,其他参数,消息体)方法发送消息,通过设置其他参数中的headers中x-delay属性,来设置延迟消息;

        (4)获取发送消息后本地时间。

  1. public class Producer {
  2. //延迟交换机
  3. private final static String DELAY_EXCHANGE="delay_exchange";
  4. public static void main(String[] args) throws Exception {
  5. //获取信道
  6. Channel channel = RabbitUtils.getChannel();
  7. //声明延迟交换机
  8. Map<String,Object> argument=new HashMap<>();
  9. //设置改交换机为direct
  10. argument.put("x-delayed-type","direct");
  11. channel.exchangeDeclare(DELAY_EXCHANGE,"x-delayed-message",true,false,argument);
  12. //发送ttl为40秒的消息
  13. Map<String,Object> map=new HashMap<>();
  14. map.put("x-delay","40000");
  15. AMQP.BasicProperties properties=
  16. new AMQP.BasicProperties().
  17. builder().headers(map).build();
  18. String message="该消息的ttl时间为40秒";
  19. channel.basicPublish(DELAY_EXCHANGE,"delay",properties,message.getBytes("UTF-8"));
  20. System.out.println("第一条消息:"+message+",发送成功,当前时间为:"
  21. + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
  22. //发送ttl为10秒的消息
  23. map.put("x-delay","10000");
  24. AMQP.BasicProperties properties1=
  25. new AMQP.BasicProperties().
  26. builder().headers(map).build();
  27. String message1="该消息的ttl时间为10秒";
  28. channel.basicPublish(DELAY_EXCHANGE,"delay",properties1,message1.getBytes("UTF-8"));
  29. System.out.println("第二条消息:"+message1+",发送成功,当前时间为:"
  30. + new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
  31. }
  32. }

 4、消费者接收消息

        (1)调用工具类获取信道,声明延迟队列,该队列不设置ttl时间;

        (2)绑定延迟队列与延迟交换机;

        (3)调用basicConsumer(队列名,是否自动应答,成功回调函数,失败回调函数)方法接收延迟消息,并获取本地时间。

  1. public class Consumer {
  2. //延迟交换机
  3. private final static String DELAY_EXCHANGE="delay_exchange";
  4. //延迟队列
  5. private final static String DELAY_QUEUE="delay_queue";
  6. public static void main(String[] args) throws Exception {
  7. //获取信道
  8. Channel channel = RabbitUtils.getChannel();
  9. //声明延迟队列
  10. channel.queueDeclare(DELAY_QUEUE,false,false,false,null);
  11. //绑定延迟队列与延迟交换机
  12. channel.queueBind(DELAY_QUEUE,DELAY_EXCHANGE,"delay");
  13. //接受消息成功回调函数
  14. DeliverCallback deliverCallback=(consumerTag,message)->{
  15. System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8")
  16. +",当前时间为:"+new SimpleDateFormat("yyyy年MM月dd日 HH时:mm分:ss秒").format(System.currentTimeMillis()));
  17. };
  18. //接收消息
  19. channel.basicConsume(DELAY_QUEUE,true,deliverCallback,consumerTag->{});
  20. }
  21. }

 

5、测试,生产者先发送ttl为40s的下洗,然后发送ttl为10s的消息,结果如下:

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCO56uv5bCP54y_,size_20,color_FFFFFF,t_70,g_se,x_16 

watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCO56uv5bCP54y_,size_20,color_FFFFFF,t_70,g_se,x_16 

ttl为10s的先被消费者消费,测试成功。 

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/689505
推荐阅读
相关标签
  

闽ICP备14008679号