当前位置:   article > 正文

rabbitMQ basicQos和basicAck作用和关系

basicqos
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.bs.utils.ConnectionUtil;
  4. public class Producer {
  5. private final static String EXCHANGE_NAME = "direct_exchange";
  6. public static void main(String[] args) throws Exception {
  7. //1、获取连接
  8. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
  9. //2、声明信道
  10. Channel channel = connection.createChannel();
  11. //3、声明交换器,类型为direct
  12. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  13. //4、创建消息
  14. String message = "hello rabbitmq";
  15. //5、发布消息
  16. channel.basicPublish(EXCHANGE_NAME, "add", null, message.getBytes());
  17. System.out.println("生产者发送" + message + "'");
  18. //6、关闭通道
  19. channel.close();
  20. //7、关闭连接
  21. connection.close();
  22. }
  23. }
消费者
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.QueueingConsumer;
  4. import com.bs.utils.ConnectionUtil;
  5. public class Consumer1 {
  6. private final static String QUEUE_NAME = "direct_queue_1";
  7. private final static String EXCHANGE_NAME = "direct_exchange";
  8. public static void main(String[] args) throws Exception{
  9. //1、获取连接
  10. Connection connection = ConnectionUtil.getConnection("localhost",5672,"/","guest","guest");
  11. //2、声明通道
  12. Channel channel = connection.createChannel();
  13. //3、声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. //4、绑定队列到交换机,指定路由key为update
  16. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
  17. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
  18. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add");
  19. //同一时刻服务器只会发送一条消息给消费者
  20. channel.basicQos(1);
  21. //5、定义队列的消费者
  22. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  23. //6、监听队列,手动返回完成状态
  24. channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
  25. //6、获取消息
  26. while (true){
  27. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  28. String message = new String(delivery.getBody());
  29. System.out.println(" 消费者1:" + message + "'");
  30. //消费者1接收一条消息后休眠10毫秒
  31. Thread.sleep(10);
  32. //返回确认状态
  33. channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
  34. }
  35. }
  36. }

 

RabbitMQ中的概念,channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。

Prefetch count 和 basicQos中的参数一致(Prefetch count就是basicQos的参数值)

channel.basicQos(1);和channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);是配套使用,只有在channel.basicQos被使用的时候channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false)才起到作用。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/127031
推荐阅读
相关标签
  

闽ICP备14008679号