赞
踩
【README】
参考 https://blog.csdn.net/u012943767/article/details/79300673 ;
【0】声明交换机,队列 与绑定
- /**
- * 交换机,队列声明与绑定
- */
- public class AckDeclarer {
- /** 确认交换机 */
- public static final String ACK_EXCHANGE2 = "ACK_EXCHNAGE2";
- /** 确认队列 */
- public static final String ACK_QUEUE2 = "ACK_QUEUE2";
- /** 路由键 */
- public static final String ACK_ROUTE2 = "ACK_ROUTE2";
-
- // 四种Exchange 模式
- // direct :需要生产者和消费者绑定相同的Exchange和routing key。
- // fanout:广播模式需要生产者消费者绑定相同的Exchange。
- // topic:支持模糊匹配的广播模式以点分隔,*表示一个单词,#表示任意数量(零个或多个)单词。
- // header:根据生产者和消费者的header中信息进行匹配性能较差 ,x-match [all 匹配所有/any 任意一个]。
-
- public static void main(String[] args) throws Exception {
- /* 获取连接*/
- Connection conn = RBConnectionUtil.getConn();
- // 创建信道
- Channel channel = conn.createChannel();
- /* 声明交换机 */
- channel.exchangeDeclare(ACK_EXCHANGE2, BuiltinExchangeType.FANOUT);
- /* 声明队列 */
- channel.queueDeclare(ACK_QUEUE2, true, false, false, null);
- System.out.println(String.format("声明交换机【%s】,队列【%s】成功", ACK_EXCHANGE2, ACK_QUEUE2));
-
- /* 把队列绑定到交换机 */
- channel.queueBind(ACK_QUEUE2, ACK_EXCHANGE2, ACK_ROUTE2);
-
- /* 关闭信道和连接 */
- channel.close();
- conn.close();
- }
- }
【1】生产者
- /**
- * 消息确认生产者
- */
- public class AckProducer {
- public static void main(String[] args) throws Exception {
- /* 获取连接*/
- Connection conn = RBConnectionUtil.getConn();
- // 创建信道
- Channel channel = conn.createChannel();
-
- String[] messages = new String[]{
- "first.-04130828"
- , "second..-04130828"
- , "third...-04130828"
- , "fourth....-04130828"
- , "fiveth.....-04130828"
- , "6th.....-04130828"
- , "7th.....-04130828"
- , "8th.....-04130828"
- , "9th.....-04130828"
- , "10th.....-04130828"
- };
- for (String msg : messages) {
- channel.basicPublish(AckDeclarer.ACK_EXCHANGE2, AckDeclarer.ACK_ROUTE2, null, msg.getBytes());
- System.out.println(msg + " is sent");
- }
- System.out.println("消息发送完成");
- channel.close();
- conn.close();
- }
- }
【2】自动确认消费者
- /**
- * 自动确认消费者
- */
- public class AutoAckConsumer {
- public static void main(String[] args) throws Exception {
- /* 获取连接*/
- Connection conn = RBConnectionUtil.getConn();
- // 创建信道
- Channel channel = conn.createChannel();
-
- System.out.println("等待消费1");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body, "UTF-8");
- System.out.println("接收到的消息=" + msg);
- try {
- doWork(msg);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- };
- channel.basicConsume(AckDeclarer.ACK_QUEUE2, true, consumer);// 为true自动确认
- }
- private static void doWork(String task) throws InterruptedException {
- for (char ch : task.toCharArray()) {
- if (ch == '.') Thread.sleep(1000);
- }
- }
- }
【3】手动确认消费者
- /**
- * 手动确认消费者
- */
- public class ManualAckConsumer {
- public static void main(String[] args) throws Exception {
- /* 获取连接*/
- Connection conn = RBConnectionUtil.getConn();
- // 创建信道
- Channel channel = conn.createChannel();
-
- System.out.println("手动确认消费者等待消费1");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body, "UTF-8");
- System.out.println("接收到的消息=" + msg);
- try {
- doWork(msg);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("消费done,手动确认ack,msg=" + msg);
- channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认
- }
- }
- };
- // 手动确认,向rabbitmq 服务器手动发送ack成功消费标识
- channel.basicConsume(AckDeclarer.ACK_QUEUE2, false, consumer);// 为false手动确认
- }
- private static void doWork(String task) throws InterruptedException {
- for (char ch : task.toCharArray()) {
- if (ch == '.') Thread.sleep(1000);
- }
- }
- }
手动确认消费者日志
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- 手动确认消费者等待消费1
- 接收到的消息=first.-04130828
- 消费done,手动确认ack,msg=first.-04130828
- 接收到的消息=second..-04130828
- 消费done,手动确认ack,msg=second..-04130828
- 接收到的消息=third...-04130828
- 消费done,手动确认ack,msg=third...-04130828
- 接收到的消息=fourth....-04130828
- 消费done,手动确认ack,msg=fourth....-04130828
- 接收到的消息=fiveth.....-04130828
- 消费done,手动确认ack,msg=fiveth.....-04130828
- 接收到的消息=6th.....-04130828
- 消费done,手动确认ack,msg=6th.....-04130828
- 接收到的消息=7th.....-04130828
- 消费done,手动确认ack,msg=7th.....-04130828
- 接收到的消息=8th.....-04130828
- 消费done,手动确认ack,msg=8th.....-04130828
- 接收到的消息=9th.....-04130828
- 消费done,手动确认ack,msg=9th.....-04130828
- 接收到的消息=10th.....-04130828
- 消费done,手动确认ack,msg=10th.....-04130828
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。