赞
踩
- public class RabbitMqUtils {
- public static Channel getChannel() throws IOException, TimeoutException {
- //创建一个链接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂IP 链接RabbitMQ的队列
- factory.setHost("192.168.116.3");
- factory.setVirtualHost("/test");
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("123");
- //创建链接
- Connection connection = factory.newConnection();
- //获取链接当中的信道
- Channel channel = connection.createChannel();
- return channel;
- }
- }
步骤:
①生产者连接RabbitMQ服务端,获取信道(即连接)
②由信道声明一个队列(调用queueDeclare方法),其中参数表示:
1.队列名称
2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数(后续使用时说明)
③控制台输入消息内容并发送,发送调用basicPublish方法,其中参数表示:
1.发送到哪个交换机(入门采用默认交换机,后续使用时详细说明其用法)
2.路由的key值是哪个 本次是队列名称
3.其他参数信息(后续使用时说明)
4.发送消息的消息体
- public class Task01 {
- //队列的名称
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMqUtils.getChannel();
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.nextLine();
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
- System.out.println("消息发送完毕:" + message);
-
- }
- }
- }
步骤:
①消费者连接RabbitMQ服务端,获取信道(即连接)
②信道调用basicConsume方法接收消息,其中参数表示:
1.消费哪个队列
2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
3.当一个消息发送过来后的回调接口,即接收到消息如何处理
4.消费者取消消费的回调
- public class Worker01 {
- //队列的名称
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println("工作线程2接收到的消息:" + new String(message.getBody()));
- };
- /**
- 取消消费的回调
- */
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println(consumeTag + "消费者取消消息消费接口回调逻辑");
- };
- System.out.println("T2等待接收消息......");
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
- }
- }
轮训分发顾名思义就是生产者将消息发送到MQ当中时,由多个消费者来处理这些消息,消费者1一条,消费者2一条,以此类推,直到MQ当中的消息消费完。
此处采用消息持久化策略(添加MessageProperties.PERSISTENT_TEXT_PLAIN),即将队列和队列当中的信息持久化到磁盘当中。
- public class Task {
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- boolean durable = true;//表示开启消息持久化
- channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.nextLine();
- //设置生产者发送的消息为持久化消息(要求保存到磁盘上)
- channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
- System.out.println("生产者发出消息:" + message);
- }
- }
- }
此处将消费者1设置为手动应答,自动应答则是消费者一旦接收到消息就告知队列,队列则将该消息从队列中删除,手动应答是为了确保消息确确实实被处理掉了才告知队列进行删除,以防处理时出现问题导致消息未被完全处理就丢弃的情况。
- public class Consumer01 {
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println("C1接收到的消息:" + new String(message.getBody(),"UTF-8"));
- //手动应答
- /**
- * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
- * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- /**
- * 采用手动应答
- */
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
- }
- }
消费者2和消费者1一样采取手动应答,确保消息的正确处理。
- public class Consumer02 {
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- DeliverCallback deliverCallback = (consumeTag, message) ->{
-
- System.out.println("C2接收到的消息:" + new String(message.getBody(),"UTF-8"));
- //手动应答
- /**
- * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
- * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- /**
- * 采用手动应答
- */
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
- }
- }
自动应答可以保证高吞吐量,但是保证不了数据传输安全性。一方面,当采用自动应答时,队列发送完消息,当消息还未被消费者接收到时消费者一旦断开连接,此条消息则会丢失。另一方面,例如消费者端并没有设置传递消息的数量限制,队列持续的发消息给消费端,然而消费者端处理消息的能力低下,导致消息大量积压,最终导致内存耗尽,此时消费者线程会被操作系统杀死,那么这些消息依然没有被消费就丢失了。手动应答可以完美的规避自动应答的弊端,但是,手动应答的吞吐量并不高,因此采用哪种应答方式需要根据实际应用场景进行权衡。
消息应答的方法有两种:
参考二当中的代码,引入线程沉睡表示消息处理的效率,让消费者1处理一条消息仅需1s,消费者2处理一条消息需要30s,生产者发送AA\BB\CC\DD四条消息,按照上述逻辑,消费者1会打印AA\CC,消费者2对打印BB\DD,当生产者发送完DD时就将消费者2线程杀死。观察消费者1的打印台情况,可以观察到DD则由消费者1进行消费了,当MQ未收到消费者2对于DD的应答,DD这条消息依然存放在MQ当中,由于消费者2断开了连接,此时MQ就会将DD发送到消费者1的信道当中,由消费者1处理DD这条消息。
生产者:
消费者1:
消费者2:
参考三当中的线程沉睡,我们类比成消费者1的处理能力高效,消费者2的处理能力低下,此处采用不公平分发,即能者多劳,让闲着的线程处理更多的消息。在消费者1和消费者2当中分别引入如下代码:
- /**
- * 设置不公平分发
- */
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
生产者发送AA\BB\CC\DD四条消息,消费者1和消费者2打印台执行效果和三当中的效果一样,唯一区别在于我们此时并没有强制杀死消费者2线程。
预取值是对不公平分发的进一步优化,所谓预取值,举个例子,此时队列当中有1\2\3\4\5\6\7\8,共8条消息,设置消费者1的预取值为5,消费者2的预取值为2,表示将这8条消息当中的5条发送给消费者1的信道当中,将2条发送给消费者2的信道当中,消费者1每应答一条消息,队列才会将下一条消息发往消费者1的信道,类似的,消费者2也是如此。这样做的目的在于假设我们可以预估到两个消费者的处理能力,根据他们的能力让他们处理对标能力的消息。预取值的大小我们一般是不会准确把握的,只有通过反复的测试才会把握到一个接近处理能力的预取值。观察两个消费者的打印台输出情况:
设置消费者1预取值及其打印台:
- /**
- * 预取值
- */
- int prefetchCount = 5;
- channel.basicQos(prefetchCount);
设置消费者2预取值及其打印台:
- /**
- * 预取值
- */
- int prefetchCount = 2;
- channel.basicQos(prefetchCount);
发布确认模式针对生产者,生产者将信道设置为confirm模式,此模式下,生产者发送的消息将需要RabbitMQ进行确认收到,防止生产者发送的消息丢失。发布确认模式有三种策略,分别为单个确认模式、批量确认模式、异步批量确认模式。针对以上三种策略,分别实现其生产者发送消息,观察其发送效率。
将三种模式封装成三个方法,并由主函数调用:
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "confirm";
- /**
- * 批量发送消息条数
- */
- private static final int MESSAGE_COUNT = 1000;
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- channel.confirmSelect();//开启发布确认模式
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
- /*
- * 发布确认模式
- * 1、单个确认
- * 2、批量确认
- * 3、异步批量确认
- */
- publishMessageIndividually(channel);//25432
- publishMessageBatch(channel);//391
- asynchronousConfirmMessage(channel);//22
- }
单个确认模式即生产者发送一条,MQ收到后回应一条,之后生产者继续发送下一条。
- //单个确认
- public static void publishMessageIndividually(Channel channel) throws Exception{
- //开始时间
- long startTime = System.currentTimeMillis();
- //批量的发消息 单个发布确认
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message = i + "";
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
- //单个消息就立即进行发布确认
- boolean flg = channel.waitForConfirms();
- if (!flg){
- System.out.println("第" + i + "条消息发送失败~");
- }
- }
- //结束时间
- long endTime = System.currentTimeMillis();
- long spendTime = endTime - startTime;
- System.out.println("单个确认模式发布1000条消息耗时:" + spendTime + " ms");
- }
批量确认模式即发送一批消息后一起确认是否收到,与上述单个确认模式相比要大大减少耗时,但是一旦某条消息未被收到,我们将很难排查到是哪条消息。
- //批量确认
- public static void publishMessageBatch(Channel channel) throws Exception{
- //开始时间
- long startTime = System.currentTimeMillis();
- //批量确认消息大小
- int confirmSize = 100;//每100条确认一次
- //批量的发送消息 批量发布确认
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message = i + "";
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
- //判断达到100条消息的时候批量的确认一次
- if (((i + 1) % confirmSize) == 0){
- boolean flg = channel.waitForConfirms();
- if (!flg){
- System.out.println("第" + (i - 99) + "条~第" + i + "条持久化失败~" );
- }else {
- System.out.println("第" + (i - 99) + "条~第" + i + "条持久化成功~" );
- }
- }
- }
- //结束时间
- long endTime = System.currentTimeMillis();
- long spendTime = endTime - startTime;
- System.out.println("批量确认模式发布1000条消息耗时:" + spendTime + " ms");
- }
异步批量确认模式完美的规避了上两种确认模式的弊端,异步批量确认模式中生产者只负责发送消息,而无需关心消息是否发送到队列当中并进行了持久化,由MQ当中的broker负责告知消息的发送结果,生产者只需要在代码当中准备一个监听器,监听broker的消息,broker告知监听器哪些消息发送成功了,哪些消息发送失败了,我们准备一个线程安全的数据容器即可,将发送的消息放到这个容器当中。监听器当中的参数ackCallback和nackCallback会分别对容器当中的数据进行处理。ackCallback有两手准备,multiple为true时表示批量应答,此时将容器当中Key小于该deliverTag的移到临时的容器当中,反之为false时,表示只应答一条,则从容器中移除该数据。
- //异步确认
- public static void asynchronousConfirmMessage(Channel channel) throws Exception{
- /**
- * 线程安全有序的哈希表 适用于高并发的情况下
- * 1.轻松的将序号和消息进行关联
- * 2.可以轻松的批量删除条目 只要给到序号
- * 3.支持高并发(多线程)
- */
- ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
- //消息确认成功回调函数
- /**
- * 1、消息的标记
- * 2、是否为批量确认
- */
- ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
- //删除已经确认的消息 剩下的就是未确认的消息
- if (multiple){
- ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
- confirmed.clear();
- }else {
- outstandingConfirms.remove(deliveryTag);
- }
- };
- //消息确认失败回调函数
- /**
- * 1、消息的标记
- * 2、是否为批量确认
- */
- ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
- String message = outstandingConfirms.get(deliveryTag);
- //3--->打印未确认的消息都有哪些
- System.out.println("未确认的消息:序号->" + deliveryTag + " 消息体:" + message);
- };
- //准备消息的监听器 监听哪些消息成功了 哪些消息失败了
- /**
- * 1、监听哪些消息成功了
- * 2、监听哪些消息失败了
- */
- channel.addConfirmListener(ackCallback,nackCallback);//异步通知
- //开始时间
- long startTime = System.currentTimeMillis();
- //批量发送消息
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message = i + "";
- //此处记录所有要发送的消息
- outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
-
- }
- //结束时间
- long endTime = System.currentTimeMillis();
- long spendTime = endTime - startTime;
- System.out.println("异步确认模式发布1000条消息耗时:" + spendTime + " ms");
- }
假设有这么一个场景,在网购系统当中,某家店铺需要将新产品进行推广,该产品面向中老年人退出,因此就需要将推广消息发送给该群体用户。中年群众和老年群众作为两种消费者群体,如何做到只发布一次消息就能做到这两种消费者都能收到该消息?此时,MQ就为我们提供了发布/订阅模式。
消息发送方:
- public class EmitLog {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "LOGS";
- /**
- * 交换机类型
- */
- private static final String EXCHANGE_TYPE = "fanout";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明一个交换机,交换机名称、交换机类型
- channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.nextLine();
- channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
- System.out.println("发送方发送消息:" + message);
- }
- }
- }
消费者1:
- public class ReceviceLogs01 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "LOGS";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明一个队列 临时队列
- /**
- * 生成一个临时队列 队列名是随机的
- * 当消费者与队列断开连接时 队列自动删除
- */
- String queueName = channel.queueDeclare().getQueue();
- /**
- * 绑定交换机与队列
- */
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- System.out.println("接收方01等待接收消息......");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println(new String(message.getBody()));
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
- }
- }
消费者2:
- public class ReceviceLogs02 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "LOGS";
- /**
- * 交换机类型
- */
- private static final String EXCHANGE_TYPE = "fanout";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明一个队列 临时队列
- /**
- * 生成一个临时队列 队列名是随机的
- * 当消费者与队列断开连接时 队列自动删除
- */
- String queueName = channel.queueDeclare().getQueue();
- /**
- * 绑定交换机与队列
- */
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- System.out.println("接收方02等待接收消息......");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println(new String(message.getBody()));
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
- }
- }
执行结果:
fanout交换机说明:此类型的交换机可以将接收到的消息发送给所有与它绑定的队列。参考我们上面所说的轮训分发,轮训分发是将队列当中的消息发送给消费者,一条消息仅能由一个消费者进行消费,如果使用默认的交换机,则需要发送多次消息,由队列再分发给每个消费者。
总结:只要该队列和此交换机绑定,无论routing-key是什么,都会接收到生产者发送的消息。
假设有这么一个场景,同样是网购系统,在下单时我们需要将交易金额1W以上的订单交给某支付机构A处理,5000-1W的交给支付机构B处理,5000元以下的也交给支付机构A处理。怎么做?支付机构A和支付机构B分别对应消费者1和消费者2,订单为生产者,生产者(网购系统)将订单信息发送至交换机,交换机根据生产者提供的交易金额(routing-key)决定由哪个消费者(支付机构)去处理。这就是直接交换机的用处,可以指定哪个消费者去消费此条消息。
生产者:
- public class DirectLogs {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "direct_logs";
- /**
- * 交换机类型
- */
- public static final String EXCHANGE_TYPE = "direct";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.nextLine();
- channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));
- }
- }
- }
消费者1:
- public class ReceiveLogsDirect01 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明一个队列
- channel.queueDeclare("console",false,false,false,null);
- /**
- * 路由绑定
- */
- channel.queueBind("console",EXCHANGE_NAME,"info");
- channel.queueBind("console",EXCHANGE_NAME,"warning");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println(new String(message.getBody()));
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume("console",true,deliverCallback,cancelCallback);
- }
- }
消费者2:
- public class ReceiveLogsDirect02 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "direct_logs";
-
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- //声明一个队列
- channel.queueDeclare("disk",false,false,false,null);
- /**
- * 路由绑定
- */
- channel.queueBind("disk",EXCHANGE_NAME,"error");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- System.out.println(new String(message.getBody()));
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume("disk",true,deliverCallback,cancelCallback);
- }
- }
避坑:队列一旦与direct类型的交换机绑定,则生产者发送消息时则不能使用队列名称来指定发送到哪个队列,只能使用routing-key来进行队列选择。
Topic交换机的应用场景是非广泛,fanout交换机和direct交换机的功能都可以使用Topic交换机实现。发送到topic交换机的消息不能具有任意的 routing_key —— 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。binding key也必须采用相同的形式。topic交换机背后的逻辑 类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:
生产者:
- public class TopicExchangePro {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "topic_logs";
- /**
- * 交换机类型
- */
- private static final String EXCHANGE_TYPE = "topic";
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 声明一个交换机
- */
- channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String rountingKey = scanner.next();
- System.out.println("输入的rountingKey为:" + rountingKey);
- String message = scanner.next();
- System.out.println("输入消息体的为:" + message);
- channel.basicPublish(EXCHANGE_NAME,rountingKey,null,message.getBytes(StandardCharsets.UTF_8));
- System.out.println("消息体:" + message + "已发送给rountKey为:" + rountingKey + "的队列");
- }
- }
- }
消费者1:
- public class TopicExchangeCon01 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 声明一个队列
- */
- channel.queueDeclare("receive1",false,false,false,null);
- /**
- * 路由绑定
- */
- channel.queueBind("receive1",EXCHANGE_NAME,"*.*.pink");
- channel.queueBind("receive1",EXCHANGE_NAME,"green.#");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- String rountingKey = message.getEnvelope().getRoutingKey();
- System.out.println("receive1接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume("receive1",true,deliverCallback,cancelCallback);
- }
- }
消费者2:
- public class TopicExchangeCon02 {
- /**
- * 交换机名称
- */
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 声明一个队列
- */
- channel.queueDeclare("receive2",false,false,false,null);
- /**
- * 路由绑定
- */
- channel.queueBind("receive2",EXCHANGE_NAME,"*.red.*");
- /**
- * 声明 接收消息
- */
- DeliverCallback deliverCallback = (consumeTag, message) ->{
- String rountingKey = message.getEnvelope().getRoutingKey();
- System.out.println("receive2接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
- };
- //取消消费的回调
- CancelCallback cancelCallback = (consumeTag) -> {
- System.out.println("消息消费被中断");
- };
- channel.basicConsume("receive2",true,deliverCallback,cancelCallback);
- }
- }
消费者1routing-key:
- *.*.pink
- green.#
两种匹配规则,路由key为(什么什么pink)可以发往该队列,路由key为(green什么什么什么...)也可以发往该队列。
消费者2routing-key:
*.red.*
匹配规则:路由key是(什么red什么)可以发往该队列。
测试用例:
- dog.is.pink 消费者1
- green.like.pink 消费者1
- green.red.dog 消费者1和消费者2
- green.cat 消费者1
- house.red.pink 消费者1和消费者2
- we.red.family 消费者2
测试结果:
生产者:
消费者1:
消费者2:
避坑:当路由key匹配到同一个队列时(即路由key匹配到同一队列的多个routing-key时),只会给队列发送一次消息。
值得注意的是:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了 。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。