赞
踩
我们之前讲述的是将消息投递到队列中再有消费者区消费,实际上RabbitM的运作流程不是这样的。真实情况是,生产者将消息发送到 Exchange (交换器:“X”),由交换器将消息路由到一个或者多个队列中,消费者再去消费消息。如果路由不到,或许会返回给生产者,或许直接丢弃(这个看具体配置后面会讲到:)。
public class Send { //队列名 private final static String QUEUE_NAME = "wsd_test"; //队列2名 private final static String QUEUE_NAME_SECOND = "wsd_test_second"; //路由器名 private final static String EXCHANGE_NAME = "exchange"; //绑定键 private final static String BINDING_KEY = "wsd_exchange"; //路由键 private final static String ROUTING_KEY = "wsd_exchange"; private static Connection connection =null; private static Channel channel = null; public static void main(String[] args) { try{ // 获取到连接以及mq通道 connection = ConnectionUtil.getConnection(); // 从连接中创建通道 channel = connection.createChannel(); //声明了一个direct 类型的交换器 channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); // 声明队列QUEUE_NAME channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明队列QUEUE_NAME_SECOND channel.queueDeclare(QUEUE_NAME_SECOND, false, false, false, null); //将路由与队列绑定,再为绑定的路径赋值一个绑定键 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,BINDING_KEY); //将路由与队列绑定,再为绑定的路径赋值一个绑定键 channel.queueBind(QUEUE_NAME_SECOND,EXCHANGE_NAME,BINDING_KEY); //发送数据 for (int i=0;i<10;i++){ // 消息内容 String message = "Hello World!"+i; //指定发送消息到哪个路由,以及他的路由键,消息等 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(200); } }catch (Exception e){ e.printStackTrace(); }finally { //关闭通道和连接 try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
消费者现象:这个时候绑定到队列下的所有消费者可进行消费,至于是轮询分发还是公平分发看消费者们的自动应答和一次发送的消息数
当绑定多个队列到路由时,每个队列都能得到一样的消息,再由绑定到队列的消费者消费。例如这个时候再给队QUEUE_NAME增加一个消费者C1_1。采用公平分发的规则,那么C1和C1_1将收到不一样的消息。而QUEUE_NAME_SECOND队列下的C2能收到全部的消息
本文重点:路由与队列绑定,以及赋绑定键
//队列名,路由器名,绑定键
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
channel.queueBind(QUEUE_NAME_SECOND,EXCHANGE_NAME,ROUTING_KEY);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。