赞
踩
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.bs.utils.ConnectionUtil; public class Producer { private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2、声明信道 Channel channel = connection.createChannel(); //3、声明交换器,类型为direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //4、创建消息 String message = "hello rabbitmq"; //5、发布消息 channel.basicPublish(EXCHANGE_NAME, "add", null, message.getBytes()); System.out.println("生产者发送" + message + "'"); //6、关闭通道 channel.close(); //7、关闭连接 connection.close(); } }
消费者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.bs.utils.ConnectionUtil; public class Consumer1 { private final static String QUEUE_NAME = "direct_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception{ //1、获取连接 Connection connection = ConnectionUtil.getConnection("localhost",5672,"/","guest","guest"); //2、声明通道 Channel channel = connection.createChannel(); //3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、绑定队列到交换机,指定路由key为update channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add"); //同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); //5、定义队列的消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、获取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消费者1:" + message + "'"); //消费者1接收一条消息后休眠10毫秒 Thread.sleep(10); //返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
RabbitMQ中的概念,channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。
Prefetch count 和 basicQos中的参数一致(Prefetch count就是basicQos的参数值)
channel.basicQos(1);和channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);是配套使用,只有在channel.basicQos被使用的时候channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false)才起到作用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。