赞
踩
com.rabbitmq:amqp-client:4.3.0
RabbitMQ
版本声明: 3.6.15RabbitMQ
的消费模式分为两种:Push(服务端推)
和Pull(客户端拉)
,Push(推)
采用Basic.Consume
进行消费,Pull(拉)
是调用Basic.Get
进行消费.在RabbitMQ
中Pull(拉)
适合单条消费,Push(推)
适合持续订阅推模式代码示例
@Test public void testNoAuotAckSync() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String hostName = "jannal.mac.com"; String queueName = "jannal.direct.queue"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); final Channel channel = conn.createChannel(); //设置客户端最多接收未被ack的消息的个数 channel.basicQos(1); //不自动确认 boolean autoAck = false; //true表示不能将一个Connection中生产者发送的消息传送给这个Connection中的消费者 boolean noLocal = false; //是否独占 boolean exclusive = false; //不同的订阅采用不同的消费者标签 String consumerTag = "consumerTag"; //basicConsume是一个同步方法 channel.basicConsume(queueName, autoAck, consumerTag, noLocal, exclusive, null, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); System.out.println("Consumer1:"+new String(body, "utf-8")); channel.basicAck(deliveryTag, false); //channel.basicReject(deliveryTag,true); } }); // 阻止主程序结束 LockSupport.park(); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
拉模式代码示例
/** * 拉取模式,适用于消费单条的情况 */ @Test public void testBasicGet() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String hostName = "jannal.mac.com"; String queueName = "jannal.direct.queue"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); final Channel channel = conn.createChannel(); // 拉模式设置无效 channel.basicQos(1); //不自动确认 boolean autoAck = false; //true表示不能将一个Connection中生产者发送的消息传送给这个Connection中的消费者 boolean noLocal = false; //是否独占 boolean exclusive = false; GetResponse getResponse = channel.basicGet(queueName, autoAck); if (getResponse != null) { System.out.println("Consumer2:" + new String(getResponse.getBody(), "utf-8")); long deliveryTag = getResponse.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); } LockSupport.park(); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消息回执(Ack)。
Basic.Ack(消息回执)
命令给Broker
或者在订阅到Queue
的时候就将autoAck(自动ack)
参数设置为true
。当设置了autoAck
时,一旦消费者接收消息,Broker
会自动视为其确认了消息。Broker
收到消息回执之后才将消息从Queue
中移除。Basic.Ack(消息回执)
并检测到消费者的Connection
断开(或者队列上取消订阅),则Broker
会将消息发送给其他消费者进行处理。这里不存在timeout
,一个消费者处理消息时间再长也不会导致该消息被发送给其他消息者,除非他的Connection
断开。 这么设计的原因是RabbitMQ
允许消费者消费一条消息的时间可以很久。Basic.Ack(消息回执)
给Broker
,这将会导致严重的bug——Queue中堆积的消息会越来越多。消费者重启后会重复消费这些消息并重复执行业务逻辑。消息拒绝
Connection
或者取消队列上的订阅Basic.reject
,如果把Basic.reject
命令的requeue
参数设置为true
,RabbitMQ
会将消息重新发送给下一个订阅的Consumer
(也可能还是同一个Consumer
,比如只有一个Consumer
时,Broker
依然会再次发送给当前这个Consumer
,最终结果就是一直收到消息,一直拒绝,一直这样循环下去),若设置 requeue=false
,则Broker
在收到Basic.reject
后,将直接将该 message 从 Queue
中移除。Basic.Ack
但是不对消息做任何处理,变相的拒绝消息消息拒绝代码示例
/** * 拒绝消息 */ @Test public void testReject() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String hostName = "jannal.mac.com"; String queueName = "jannal.direct.queue"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); final Channel channel = conn.createChannel(); //设置客户端最多接收未被ack的消息的个数 channel.basicQos(1); //不自动确认 boolean autoAck = false; //true表示不能将一个Connection中生产者发送的消息传送给这个Connection中的消费者 boolean noLocal = false; //是否独占 boolean exclusive = false; //不同的订阅采用不同的消费者标签 String consumerTag = "consumerTag"; //重新入队,让其他消费端消费 final boolean requeue = true; //basicConsume是一个同步方法 channel.basicConsume(queueName, autoAck, consumerTag, noLocal, exclusive, null, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); System.out.println("Consumer2:"+new String(body, "utf-8")); //一次只能拒绝一条消息 channel.basicReject(deliveryTag,requeue); } }); // 阻止主程序结束 LockSupport.park(); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
如果多个Consumer
同时订阅同一个Queue
中的消息,Queue
中的消息会被平摊给多个Consumer
,如果每个消息的处理时间不同,就可能导致一些Consumer
一直在忙,而另外一些Consumer
一直在空闲。可以通过设置Prefetch count
来限制Queue
每次发送给每个Consumer
的消息数,比如设置为1,则Queue
每次给每个Consumer
发送一条消息,消费者处理完这条消息后Queue
会再给该消费者发送一条消息。
实际使用RabbitMQ
过程中,如果完全不配置QoS
,这样Rabbit会尽可能快速地发送队列中的所有消息到Consumer
。因为Consumer
在本地缓存所有的message,从而极有可能导致OOM或者导致服务器内存不足影响其它进程的正常运行。所以我们需要通过设置Qos的prefetch count
来控制Consumer
的流量。同时设置得当也会提高Consumer
的吞吐量。
prefetch
允许为每个Consumer
指定最大的unacked messages
数目。简单来说就是用来指定一个Consumer
一次可以从Broker
中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。一旦缓冲区满了,Broker
将会停止投递新的message到该Consumer
中直到它发出ack. 对于多个Consumer
,只要unacked
数少于prefetch * consumer
数目,Broker
就不断将消息投递过去。
拉模式设置无效。prefetch
设置为0,则表示没有上限
int prefetchSize = 0;
int prefetchCount = 0;
boolean global = false;
channel.basicQos( prefetchSize, prefetchCount, global);
官方文档:https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/
可通过显示调用getCloseReason()
方法或通过使用ShutdownListener
类中的业务方法的cause参数来从ShutdownSignalException
中获取关闭原因的有用信息.
ShutdownSignalException
类提供方法来分析关闭的原因.通过调用isHardError()
方法,我们可以知道是connection
错误还是channel
错误.getReason()
会返回相关cause的相关信息,这些引起cause的方法形式-要么是AMQP.Channel.Close
方法,要么是AMQP.Connection.Close
(或者是null,如果是library中引发的异常,如网络通信故障,在这种情况下,可通过getCause()方法来获取信息).
代码示例
@Test public void testShutdown() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String hostName = "jannal.mac.com"; String queueName = "jannal.direct.queue"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); final Channel channel = conn.createChannel(); channel.basicQos(1); boolean autoAck = false; boolean noLocal = false; boolean exclusive = false; String consumerTag = "consumerTag"; //basicConsume是一个同步方法 channel.basicConsume(queueName, autoAck, consumerTag, noLocal, exclusive, null, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); System.out.println("Consumer3:" + new String(body, "utf-8")); //模拟错误 int i = 1 / 0; channel.basicAck(deliveryTag, false); } }); //当Connection或Channel是Closed状态时会调用ShutdownListener channel.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isHardError()) { Connection connecton = (Connection) cause.getReference(); if (!cause.isInitiatedByApplication()) { logger.warn("Connection关闭:{}", cause.getReason().toString()); } } else { Channel channel = (Channel) cause.getReference(); ShutdownSignalException closeReason = channel.getCloseReason(); logger.warn("Channel关闭原因:{},{}", closeReason.getReason().toString(), cause.getCause()); } } }); // 阻止主程序结束 LockSupport.park(); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。