当前位置:   article > 正文

rabbitmq手动确认ack_rabbitmq fanout ack

rabbitmq fanout ack

【README】

参考 https://blog.csdn.net/u012943767/article/details/79300673

 

【0】声明交换机,队列 与绑定

  1. /**
  2. * 交换机,队列声明与绑定
  3. */
  4. public class AckDeclarer {
  5. /** 确认交换机 */
  6. public static final String ACK_EXCHANGE2 = "ACK_EXCHNAGE2";
  7. /** 确认队列 */
  8. public static final String ACK_QUEUE2 = "ACK_QUEUE2";
  9. /** 路由键 */
  10. public static final String ACK_ROUTE2 = "ACK_ROUTE2";
  11. // 四种Exchange 模式
  12. // direct :需要生产者和消费者绑定相同的Exchange和routing key。
  13. // fanout:广播模式需要生产者消费者绑定相同的Exchange。
  14. // topic:支持模糊匹配的广播模式以点分隔,*表示一个单词,#表示任意数量(零个或多个)单词。
  15. // header:根据生产者和消费者的header中信息进行匹配性能较差 ,x-match [all 匹配所有/any 任意一个]。
  16. public static void main(String[] args) throws Exception {
  17. /* 获取连接*/
  18. Connection conn = RBConnectionUtil.getConn();
  19. // 创建信道
  20. Channel channel = conn.createChannel();
  21. /* 声明交换机 */
  22. channel.exchangeDeclare(ACK_EXCHANGE2, BuiltinExchangeType.FANOUT);
  23. /* 声明队列 */
  24. channel.queueDeclare(ACK_QUEUE2, true, false, false, null);
  25. System.out.println(String.format("声明交换机【%s】,队列【%s】成功", ACK_EXCHANGE2, ACK_QUEUE2));
  26. /* 把队列绑定到交换机 */
  27. channel.queueBind(ACK_QUEUE2, ACK_EXCHANGE2, ACK_ROUTE2);
  28. /* 关闭信道和连接 */
  29. channel.close();
  30. conn.close();
  31. }
  32. }

【1】生产者

  1. /**
  2. * 消息确认生产者
  3. */
  4. public class AckProducer {
  5. public static void main(String[] args) throws Exception {
  6. /* 获取连接*/
  7. Connection conn = RBConnectionUtil.getConn();
  8. // 创建信道
  9. Channel channel = conn.createChannel();
  10. String[] messages = new String[]{
  11. "first.-04130828"
  12. , "second..-04130828"
  13. , "third...-04130828"
  14. , "fourth....-04130828"
  15. , "fiveth.....-04130828"
  16. , "6th.....-04130828"
  17. , "7th.....-04130828"
  18. , "8th.....-04130828"
  19. , "9th.....-04130828"
  20. , "10th.....-04130828"
  21. };
  22. for (String msg : messages) {
  23. channel.basicPublish(AckDeclarer.ACK_EXCHANGE2, AckDeclarer.ACK_ROUTE2, null, msg.getBytes());
  24. System.out.println(msg + " is sent");
  25. }
  26. System.out.println("消息发送完成");
  27. channel.close();
  28. conn.close();
  29. }
  30. }

【2】自动确认消费者

  1. /**
  2. * 自动确认消费者
  3. */
  4. public class AutoAckConsumer {
  5. public static void main(String[] args) throws Exception {
  6. /* 获取连接*/
  7. Connection conn = RBConnectionUtil.getConn();
  8. // 创建信道
  9. Channel channel = conn.createChannel();
  10. System.out.println("等待消费1");
  11. Consumer consumer = new DefaultConsumer(channel) {
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope,
  14. BasicProperties properties, byte[] body) throws IOException {
  15. String msg = new String(body, "UTF-8");
  16. System.out.println("接收到的消息=" + msg);
  17. try {
  18. doWork(msg);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. };
  24. channel.basicConsume(AckDeclarer.ACK_QUEUE2, true, consumer);// 为true自动确认
  25. }
  26. private static void doWork(String task) throws InterruptedException {
  27. for (char ch : task.toCharArray()) {
  28. if (ch == '.') Thread.sleep(1000);
  29. }
  30. }
  31. }

【3】手动确认消费者

  1. /**
  2. * 手动确认消费者
  3. */
  4. public class ManualAckConsumer {
  5. public static void main(String[] args) throws Exception {
  6. /* 获取连接*/
  7. Connection conn = RBConnectionUtil.getConn();
  8. // 创建信道
  9. Channel channel = conn.createChannel();
  10. System.out.println("手动确认消费者等待消费1");
  11. Consumer consumer = new DefaultConsumer(channel) {
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope,
  14. BasicProperties properties, byte[] body) throws IOException {
  15. String msg = new String(body, "UTF-8");
  16. System.out.println("接收到的消息=" + msg);
  17. try {
  18. doWork(msg);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } finally {
  22. System.out.println("消费done,手动确认ack,msg=" + msg);
  23. channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认
  24. }
  25. }
  26. };
  27. // 手动确认,向rabbitmq 服务器手动发送ack成功消费标识
  28. channel.basicConsume(AckDeclarer.ACK_QUEUE2, false, consumer);// 为false手动确认
  29. }
  30. private static void doWork(String task) throws InterruptedException {
  31. for (char ch : task.toCharArray()) {
  32. if (ch == '.') Thread.sleep(1000);
  33. }
  34. }
  35. }

手动确认消费者日志 

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. 手动确认消费者等待消费1
  5. 接收到的消息=first.-04130828
  6. 消费done,手动确认ack,msg=first.-04130828
  7. 接收到的消息=second..-04130828
  8. 消费done,手动确认ack,msg=second..-04130828
  9. 接收到的消息=third...-04130828
  10. 消费done,手动确认ack,msg=third...-04130828
  11. 接收到的消息=fourth....-04130828
  12. 消费done,手动确认ack,msg=fourth....-04130828
  13. 接收到的消息=fiveth.....-04130828
  14. 消费done,手动确认ack,msg=fiveth.....-04130828
  15. 接收到的消息=6th.....-04130828
  16. 消费done,手动确认ack,msg=6th.....-04130828
  17. 接收到的消息=7th.....-04130828
  18. 消费done,手动确认ack,msg=7th.....-04130828
  19. 接收到的消息=8th.....-04130828
  20. 消费done,手动确认ack,msg=8th.....-04130828
  21. 接收到的消息=9th.....-04130828
  22. 消费done,手动确认ack,msg=9th.....-04130828
  23. 接收到的消息=10th.....-04130828
  24. 消费done,手动确认ack,msg=10th.....-04130828

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/127102
推荐阅读
相关标签
  

闽ICP备14008679号