当前位置:   article > 正文

rabbitmq——高级特性_queueingconsumer.nextdelivery().getbody()

queueingconsumer.nextdelivery().getbody()

rabbitmq,高级特性

目录

rabbitmq,高级特性

1.消息如何保障100% 的投递成功?   

    1.1 自定义消费者    1.2 消息的ack与重回队列    1.3 消息的限流    1.4 TTL消息    1.5 死信队列

2.幂等性概念详解

消费端,幂等性保障;

(1)唯一id +  指纹码 机制, 利用数据库主键去重;

 (2) 利用redis 的原子性去操作;

 

4.Confirm 确认消息,Return 返回消息

1.confirm 确认机制:

2.confirm 确认消息实现

5、Return消息机制

6、消费端自定义监听

7、消费端的限流

8、消费端ACK 与重回队列

消费端的重回队列:

9、TTL队列/消息

10、 死信队列 

消息变成死信有以下几种情况


1.消息如何保障100% 的投递成功?
   

  1.    什么是生产端的可靠性投递?

     生产端-可靠性投递   

   (1)消息落库, 对消息状态进行打标,

(设置状态值),收到消息,把状态修改,针对没有响应的消息,轮循发送,设置最大的尝试次数;

 

但是在高并发情况下,多次对消息落库,对数据库造成很大压力,在高并发的场景下可能不合适

(2)消息的延迟投递,做二次确认,回调检查;(比较可靠,但是也不一定是100%)

  

 

  1. 保障消息成功发出
  2.  发送端收到MQ节点(Broker)确认应答
  3. 完善的消息进行补偿

   

 

   

    1.1 自定义消费者
    1.2 消息的ack与重回队列
    1.3 消息的限流
    1.4 TTL消息
    1.5 死信队列

2.幂等性概念详解

幂等性是什么, 数据库乐观锁机制很象,  不管执行多少次,他的结果是相同的;

比如, update t_reps set count = count -1 , version = version + 1 where version = 1,  

消费端,幂等性保障;

1.在海量订单产生的业务高峰期,如何避免消息的重复消费问题

 (1)消费端实现幂等性, 就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息, 代码跑多次,结果是一次,

2.业界主流的幂等性操作

(1)唯一id +  指纹码 机制, 利用数据库主键去重;

   生成一个全局的ID, 指纹码(可能是业务规则,时间戳等,保证这次操作是唯一的)

select count (1) from T_ORDER where id = 唯一id +  指纹码;

好处, 实现简单

坏处:高并发情况系, 数据库写入的性能瓶颈;

解决:根据id进行分库分表进行算法路由

 (2) 利用redis 的原子性去操作;

 

set 会自动更新,或者自增;需要考虑的问题

第一: 我们是否要进行数据库落库, 如果落库的话, 关键解决的问题是数据库和缓存如何做到原子性?

 

 

4.Confirm 确认消息,Return 返回消息

1.confirm 确认机制:

1.消息的确认,是指生产者投递消息后, 如果broker收到消息,则会给我们生产者一个应答

2.生产者进行接收应答,用来确定这条消息是否正常发送到broker, 这种方式也是可靠性投递的核心保障

2.confirm 确认消息实现

1) 在channel 上开启确认模式: channel.confirmSelect();

2) 在channel 上添加监听: addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送,或者记录日志, 等后续处理

 

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. // 1.创建ConnectionFactory
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("127.0.0.1");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. // 2. 获取Connection
  9. Connection connection = connectionFactory.newConnection();
  10. //3. 通过Connection 创建一个新的Channel
  11. Channel channel = connection.createChannel();
  12. // 4 指定消息投递模式,:消息的确认的模式
  13. channel.confirmSelect();
  14. String exchangeName = "test_confirm_exchange";
  15. String routingKey = "confirm.save";
  16. // 5 发送一条消息
  17. String msg = "Hello RabbitMq sen confirm message";
  18. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  19. // 6. 添加一个监听
  20. channel.addConfirmListener(new ConfirmListener() {
  21. // 成功
  22. // 关键的唯一的消息标签deliveryTag, multiple :是否批量, 暂时不用管
  23. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  24. System.out.println("成功 Ack");
  25. }
  26. // 失败, 比如磁盘写满了,mq 出现一些异常, key 容量达到上限,也有可能handleAck,handleNack 都没有收到, 进行抓取,和重发
  27. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  28. System.out.println("失败 Nack");
  29. }
  30. });
  31. }
  32. }

消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  3. // 1.创建ConnectionFactory
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("127.0.0.1");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. // 2. 获取Connection
  9. Connection connection = connectionFactory.newConnection();
  10. //3. 通过Connection 创建一个新的Channel
  11. Channel channel = connection.createChannel();
  12. String exchangeName = "test_confirm_exchange";
  13. String routingKey = "confirm.*";
  14. // AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
  15. // 4.声明一个交换机和队列, 然后进行绑定设置,最后指定路由key
  16. channel.exchangeDeclare(exchangeName, "topic", true);
  17. //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
  18. // Map<String, Object> arguments) throws IOException;
  19. //queue:队列名称, durable:是否持久话;arguments:扩展参数
  20. String queueName = "test_confirm_querue";
  21. channel.queueDeclare(queueName, true, false, false, null);
  22. channel.queueBind(queueName, exchangeName, routingKey);
  23. // 5, 创建消费者, 指定Consumer的channel
  24. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  25. // 设置消费者, 指定消费的队列, autoAck: 自动签收, callback:queueingConsumer
  26. channel.basicConsume(queueName, true, queueingConsumer);
  27. while (true){
  28. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  29. System.out.println("消费端" + new String(delivery.getBody()));
  30. }
  31. }
  32. }

5、Return消息机制

 1.Return Listener 用于处理一些不可路由的消息, 也是生产端需要设置的, 我的的消息生产者,用过指定一个Exchange和Routingkey ,把消息送达到某一个对队列中去,然后我们的消费者监听队列,进行消费的处理操作;但是在某些情况下,如果我们在发送消息的时候, 当前的exchange 不存在或者指定的路由key 路由不到, 这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener;

2、基础api中的一个非常重要的关键配置项

Mandatory:如果为true, 则监听器会接收到路由不可达的消息,然后进行后续处理, 如果为false, 那么broker端自动删除该消息;


看一下测试代码

生产者

  1. public class Produce {
  2. /**
  3. * @param args
  4. * @throws IOException
  5. * @throws TimeoutException
  6. */
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost("127.0.0.1");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setVirtualHost("/");
  12. Connection connection = connectionFactory.newConnection();
  13. Channel channel = connection.createChannel();
  14. String exchangeName = "test_return_exchange";
  15. String routingKey = "return.save";
  16. String routingKeyError = "abc.save";
  17. channel.addReturnListener(new ReturnListener() {
  18. /**
  19. * int replyCode, 响应吗, 路由成没成功
  20. * String replyText, 回复内容
  21. * String exchange,
  22. * String routingKey,
  23. * AMQP.BasicProperties properties,
  24. * byte[] body 实际的消息体内容
  25. * @param replyCode
  26. * @param replyText
  27. * @param exchange
  28. * @param routingKey
  29. * @param properties
  30. * @param body
  31. * @throws IOException 跑出ioexception 异常
  32. */
  33. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  34. System.out.println("handle return");
  35. System.out.println("replyCode: " + replyCode);
  36. System.out.println("exchange: " + exchange);
  37. System.out.println("routingKey: " + routingKey);
  38. System.out.println("properties: " + properties.toString());
  39. System.out.println("body: " + new String(body));
  40. }
  41. });
  42. String msg = "Hello RabbitMQ return Message";
  43. // Mandatory:如果为true, 则监听器会接收到路由不可达的消息,然后进行后续处理, 如果为false, 那么broker端自动删除该消息;
  44. channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
  45. }
  46. }

消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_return_exchange";
  10. String routingKey = "return.#";
  11. String queueName = "test_return_queue";
  12. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  13. channel.queueDeclare(queueName, true, false, false, null);
  14. channel.queueBind(queueName, exchangeName, routingKey);
  15. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  16. channel.basicConsume(queueName, true, queueingConsumer);
  17. while (true) {
  18. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  19. String msg = new String(delivery.getBody());
  20. System.out.println(msg);
  21. }
  22. }
  23. }

6、消费端自定义监听

我们前面的demo 中是在代码中编写while循环,进行 queueingConsumer.nextDelivery(); 方法进行获取下一条消息,然后惊醒消费处理,这样是不合理的; 我们在工作中使用自定义的Consumer 更加的方便,解耦性更加的强,更加的优雅, 也是在实际工作中最常用的使用方式;

我们通过继承DefaultConsumer 方法重写handleDelivery(...) 方法来自定义监听

生产者:

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_consumer_exchange";
  10. String routingKey = "consumer.save";
  11. // 发送5条消息
  12. String msg = "Hello RabbitMq sen consumer message";
  13. for (int i = 0; i < 5; i++) {
  14. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  15. }
  16. }
  17. }

消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_consumer_exchange";
  10. String routingKey = "consumer.*";
  11. channel.exchangeDeclare(exchangeName, "topic", true);
  12. String queueName = "test_consumer_queue";
  13. channel.queueDeclare(queueName, true, false, false, null);
  14. channel.queueBind(queueName, exchangeName, routingKey);
  15. // 5, 创建消费者, 指定Consumer的channel
  16. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  17. // 设置消费者, 指定消费的队列, autoAck: 自动签收, callback:queueingConsumer
  18. // MyConsumer 自定义的消费者, 去除之前的while 循环
  19. channel.basicConsume(queueName, true, new MyConsumer(channel));
  20. }
  21. }

 

自定义监听

  1. public class MyConsumer extends DefaultConsumer {
  2. public MyConsumer(Channel channel) {
  3. super(channel);
  4. }
  5. @Override
  6. public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
  7. Envelope envelope, //
  8. AMQP.BasicProperties properties,
  9. byte[] body)
  10. throws IOException {
  11. System.out.println("---handleDelivery----");
  12. System.out.println("consumerTag" + consumerTag);
  13. System.out.println("envelope" + envelope.toString());
  14. System.out.println("properties" + properties);
  15. System.out.println("body" + new String(body));
  16. }
  17. }
  18. 打印结果
  19. ---handleDelivery----
  20. consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
  21. envelopeEnvelope(deliveryTag=1, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
  22. properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  23. bodyHello RabbitMq sen consumer message
  24. ---handleDelivery----
  25. consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
  26. envelopeEnvelope(deliveryTag=2, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
  27. properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  28. bodyHello RabbitMq sen consumer message
  29. ---handleDelivery----
  30. consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
  31. envelopeEnvelope(deliveryTag=3, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
  32. properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  33. bodyHello RabbitMq sen consumer message

 

7、消费端的限流

什么是限流:假设一个场景, 首先,我么的rabbitmq 服务器上有上万条未处理的消息,我们随便打开一个消费者的客户端,会出现下面的情况, 巨量的消息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据;

比如单个生产者一分钟生产几百条数据,但是消费端每分钟可能只能消费几十条数据,这个时候生产端和消费端存在严重的不平衡;消费端要做限流,是消费端更稳定;

RabbitMq 提供了一种qos(服务质量保证) 功能, 即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos 的值)未被确认前,不进行消费新的消息;

如果是在限流的情况下,一定不能设置自动签收,autoAck = false; 一定要手动的进行消费;

  1. void basicQos(int prefetchSize, 消费端 : 0, 不限制
  2. int prefetchCount, //一次最多处理多少条消息,一般在工作中设置为1 就好了
  3. boolean global // 这个限流策略是在什么上应用的, RabbitMq 上有两个级别, 1.channel, 2,consumer, 一般是false, true:在 channel, false:在consumer
  4. ) throws IOException

生产者

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_qos_exchange";
  10. String routingKey = "qos.save";
  11. // 5 发送一条消息
  12. String msg = "Hello RabbitMq send qos message";
  13. for (int i = 0; i < 5; i++) {
  14. channel.basicPublish(exchangeName, routingKey, true,null, msg.getBytes());
  15. }
  16. }
  17. }

消费端

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_qos_exchange";
  10. String routingKey = "qos.#";
  11. String queueName = "test_qos_queue";
  12. channel.exchangeDeclare(exchangeName, "topic", true);
  13. channel.queueDeclare(queueName, true, false, false, null);
  14. channel.queueBind(queueName, exchangeName, routingKey);
  15. //prefetchCount 消费端最大消费数量, 1为一条一条的处理
  16. channel.basicQos(0, 1, false);
  17. // 1. 限流方式; autoAck false, 签收方式设置为手动签收
  18. channel.basicConsume(queueName, false , new MyConsumer(channel));
  19. }
  20. }

自定义的监听

  1. public class MyConsumer extends DefaultConsumer {
  2. private Channel channel;
  3. public MyConsumer(Channel channel) {
  4. super(channel);
  5. this.channel = channel;
  6. }
  7. @Override
  8. public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
  9. Envelope envelope, //
  10. AMQP.BasicProperties properties,
  11. byte[] body)
  12. throws IOException {
  13. System.out.println("---handleDelivery----");
  14. System.out.println("consumerTag" + consumerTag);
  15. System.out.println("envelope" + envelope.toString());
  16. System.out.println("properties" + properties);
  17. System.out.println("body" + new String(body));
  18. //long deliveryTag,
  19. // boolean multiple, 是否批量删除, false 不支持批量签收
  20. //basicAck 主动给broker 推送消息, 若果basicAck 不打开只能控制台打印一条消息
  21. long deliveryTag = envelope.getDeliveryTag();
  22. channel.basicAck(deliveryTag, false);
  23. }
  24. }

 

8、消费端ACK 与重回队列

消费端的手工ACK(确认)和NACK(没有确认,不成功); 消费端进行消费的时候, 如果由于业务异常我们可以进行日志的记录, 然后进行补偿;如果由于服务器宕机等严重的问题, 那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列:

消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker! 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_ack_exchange";
  10. String routingKey = "ack.save";
  11. Map<String, Object> headers = new HashMap<String, Object>();
  12. for (int i = 0; i < 5; i++) {
  13. headers.put("num", i);
  14. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  15. .deliveryMode(2) // 投递模式
  16. .contentEncoding("UTF-8")
  17. .headers(headers)
  18. .build();
  19. String msg = "Hello RabbitMq End ack SMessage" + i;
  20. channel.basicPublish(exchangeName, routingKey, true,properties, msg.getBytes());
  21. }
  22. }
  23. }
  24. public class Consumer {
  25. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  26. ConnectionFactory connectionFactory = new ConnectionFactory();
  27. connectionFactory.setHost("127.0.0.1");
  28. connectionFactory.setPort(5672);
  29. connectionFactory.setVirtualHost("/");
  30. Connection connection = connectionFactory.newConnection();
  31. Channel channel = connection.createChannel();
  32. String exchangeName = "test_ack_exchange";
  33. String routingKey = "ack.#";
  34. String queueName = "test_ack_queue";
  35. channel.exchangeDeclare(exchangeName, "topic", true);
  36. channel.queueDeclare(queueName, true, false, false, null);
  37. channel.queueBind(queueName, exchangeName, routingKey);
  38. // 1. 手工签收,必须关闭autoAck = false;
  39. channel.basicConsume(queueName, false , new MyConsumer(channel));
  40. }
  41. }
  42. public class MyConsumer extends DefaultConsumer {
  43. private Channel channel;
  44. public MyConsumer(Channel channel) {
  45. super(channel);
  46. this.channel = channel;
  47. }
  48. @Override
  49. public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
  50. Envelope envelope, //
  51. AMQP.BasicProperties properties,
  52. byte[] body)
  53. throws IOException {
  54. System.out.println("---handleDelivery----");
  55. System.out.println("body" + new String(body));
  56. try {
  57. Thread.sleep(2000);
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. }
  61. if ((Integer) properties.getHeaders().get("num") == 0) {
  62. // 一般requeue 设置为false, 如果消息失败,会把这条消息添加到队列的尾端, 如果一直失败, 一直尝试
  63. channel.basicNack(envelope.getDeliveryTag(), false, true);
  64. }else {
  65. //long deliveryTag,
  66. // boolean multiple, 是否批量删除, false 不支持批量签收
  67. //basicAck 主动给broker 推送消息
  68. long deliveryTag = envelope.getDeliveryTag();
  69. channel.basicAck(deliveryTag, false);
  70. }
  71. }
  72. }

 

9、TTL队列/消息

 

10、 死信队列 

死信队列:DLX, Dead-letter-Exchange;

利用DLX ,当消息在一个队列中变成死信(dead message) 之后, 他能被冲新publish到另一个Exchange, 这个exchange 就是DLX

消息变成死信有以下几种情况

1、消息被拒绝(basic.reject/ basic.nack) 并且 requeue=false

2、下次TTL 过期

3、队列达到最大长度

2.

3

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("127.0.0.1");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/");
  7. Connection connection = connectionFactory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String exchangeName = "test_dlx_exchange";
  10. String routingKey = "dlx.save";
  11. String msg = "Hello RabbitMq Send DLX SMessage" ;
  12. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  13. .deliveryMode(2)
  14. .contentEncoding("UTF-8")
  15. .expiration("10000")
  16. .build();
  17. channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
  18. }
  19. }
  20. public class Consumer {
  21. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  22. ConnectionFactory connectionFactory = new ConnectionFactory();
  23. connectionFactory.setHost("127.0.0.1");
  24. connectionFactory.setPort(5672);
  25. connectionFactory.setVirtualHost("/");
  26. Connection connection = connectionFactory.newConnection();
  27. Channel channel = connection.createChannel();
  28. // 这就是一个普通的交换机和队列,交换机
  29. String exchangeName = "test_dlx_exchange";
  30. String routingKey = "dlx.#";
  31. String queueName = "test_dlx_queue";
  32. channel.exchangeDeclare(exchangeName, "topic", true);
  33. Map<String, Object> agruments = new HashMap<String, Object>();
  34. agruments.put("x-dead-letter-exchange","dlx.exchange");
  35. channel.queueDeclare(queueName, true, false, false, agruments);
  36. channel.queueBind(queueName, exchangeName, routingKey);
  37. // 要进行死信队列的声明,绑定
  38. channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
  39. channel.queueDeclare("dlx.queue",true, false, false, null);
  40. channel.queueBind("dlx.queue", "dlx.exchange","#");
  41. // 1. 手工签收,必须关闭autoAck = false;
  42. channel.basicConsume(queueName, true , new MyConsumer(channel));
  43. }
  44. }
  45. public class MyConsumer extends DefaultConsumer {
  46. private Channel channel;
  47. public MyConsumer(Channel channel) {
  48. super(channel);
  49. this.channel = channel;
  50. }
  51. @Override
  52. public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
  53. Envelope envelope, //
  54. AMQP.BasicProperties properties,
  55. byte[] body)
  56. throws IOException {
  57. System.out.println("---handleDelivery----");
  58. System.out.println("body" + new String(body));
  59. try {
  60. Thread.sleep(2000);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/716766
推荐阅读
相关标签
  

闽ICP备14008679号