赞
踩
Publisher confirms are a RabbitMQ extension to implement reliable publishing. When publisher confirms are enabled on a channel, messages the client publishes are confirmed asynchronously by the broker, meaning they have been taken care of on the server side.
publisher confirms是一个RabbitMQ的插件用于实现可靠的发布。当publisher confirms在channel上启用时,broker将异步确认客户端发送的信息,意味着服务端接收到了消息。
Channel channel = connection.createChannel();
channel.confirmSelect();
private final static String QUEUE_NAME = "test_queue_confirm1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 生产者调用confirmSelect将channel设置为confirm模式注意 channel.confirmSelect(); String message = "hello confirm message"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); if(!channel.waitForConfirms()){ System.out.println("message send failed"); }else{ System.out.println("message send ok"); } channel.close(); connection.close(); }
private final static String QUEUE_NAME = "test_queue_confirm2"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 生产者调用confirmSelect将channel设置为confirm模式注意 // 批量 channel.confirmSelect(); for (int i = 0;i<10;i++) { String message = "hello confirm message " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("message send failed"); }else{ System.out.println("message send ok"); } channel.close(); connection.close(); }
private final static String QUEUE_NAME = "test_queue_confirm4"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleAck deliveryTag "+deliveryTag+" multiple"+multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleNack deliveryTag "+deliveryTag+" multiple"+multiple); } }); long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",QUEUE_NAME,null,"send listener message".getBytes()); System.out.println("[send] message" +"send listener message" + "nextSeqNo "+ nextSeqNo ); }
rabbitMq的ack和publisher confirm需要区分开来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。