当前位置:   article > 正文

RabbitMQ创建死信队列

rabbitmq创建死信队列

向Queue_TTL队列里发送消息,10秒后当消息没有被正常消费掉则消息过期被转发至 Queue_DLX死信队列

  1. const amqp = require('amqplib');
  2. async function TTL_production() {
  3. var conn = await amqp.connect("amqp://localhost:5672");
  4. var channel = await conn.createChannel();
  5. var Exchange_TTL = "Exchange_TTL";
  6. var Queue_TTL = "Queue_TTL";
  7. var RoutingKey_TTL = "RoutingKey_TTL";
  8. var Exchange_DLX = 'Exchange_DLX';
  9. var Queue_DLX = 'Queue_DLX';
  10. var RoutingKey_DLX = 'RoutingKey_DLX';
  11. //创建死信队列 这里声明死信队列是为了确保死信队列每次都存在,消息不会丢失
  12. await channel.assertExchange(Exchange_DLX, 'direct', { autoDelete: true, durable: true });
  13. await channel.assertQueue(Queue_DLX, {
  14. exclusive: false,
  15. });
  16. await channel.bindQueue(Queue_DLX, Exchange_DLX, RoutingKey_DLX)
  17. //创建消息队列
  18. await channel.assertExchange(Exchange_TTL, "direct", { autoDelete: true, durable: true });
  19. await channel.assertQueue(Queue_TTL, {
  20. exclusive: false,
  21. deadLetterExchange: Exchange_DLX,
  22. deadLetterRoutingKey: RoutingKey_DLX,
  23. });
  24. await channel.bindQueue(Queue_TTL, Exchange_TTL, RoutingKey_TTL);
  25. await channel.publish(Exchange_TTL, RoutingKey_TTL, Buffer.from("发送了一条消息!"), {
  26. //消息10秒后过期
  27. expiration: '10000'
  28. })
  29. console.log("消息发送成功");
  30. await channel.close();
  31. await conn.close();
  32. }
  33. TTL_production();

2. 死信消费者

  1. const amqp = require('amqplib');
  2. async function DLX_comsumer() {
  3. var conn = await amqp.connect("amqp://localhost:5672");
  4. var channel = await conn.createChannel();
  5. var Exchange_DLX = 'Exchange_DLX';
  6. var Queue_DLX = 'Queue_DLX';
  7. var RoutingKey_DLX = 'RoutingKey_DLX';
  8. await channel.assertExchange(Exchange_DLX, 'direct', { autoDelete: true, durable: true });
  9. await channel.assertQueue(Queue_DLX, {
  10. exclusive: false,
  11. });
  12. await channel.bindQueue(Queue_DLX, Exchange_DLX, RoutingKey_DLX)
  13. await channel.prefetch(1, false)
  14. await channel.consume(Queue_DLX, msg => {
  15. console.log(msg.content.toString())
  16. channel.ack(msg);
  17. }, { noAck: false })
  18. await channel.close();
  19. await conn.close();
  20. }
  21. DLX_comsumer();

执行生产者向Queue_TTL 发送消息。 

10秒后消息会转发至Queue_DLX 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/930130
推荐阅读
相关标签
  

闽ICP备14008679号