当前位置:   article > 正文

【SpringCloud组件——RabbitMQ(上)】_springcloud rabbit多生产者消费者

springcloud rabbit多生产者消费者

一、简单的生产者-消费者

1.1、创建连接工具类获取信道

  1. public class RabbitMqUtils {
  2. public static Channel getChannel() throws IOException, TimeoutException {
  3. //创建一个链接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //工厂IP 链接RabbitMQ的队列
  6. factory.setHost("192.168.116.3");
  7. factory.setVirtualHost("/test");
  8. //用户名
  9. factory.setUsername("admin");
  10. //密码
  11. factory.setPassword("123");
  12. //创建链接
  13. Connection connection = factory.newConnection();
  14. //获取链接当中的信道
  15. Channel channel = connection.createChannel();
  16. return channel;
  17. }
  18. }

 1.2、创建生产者

步骤:

①生产者连接RabbitMQ服务端,获取信道(即连接)

②由信道声明一个队列(调用queueDeclare方法),其中参数表示:

1.队列名称
2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数(后续使用时说明)
③控制台输入消息内容并发送,发送调用basicPublish方法,其中参数表示:
1.发送到哪个交换机(入门采用默认交换机,后续使用时详细说明其用法)
2.路由的key值是哪个 本次是队列名称
3.其他参数信息(后续使用时说明)
4.发送消息的消息体

  1. public class Task01 {
  2. //队列的名称
  3. public static final String QUEUE_NAME = "hello";
  4. public static void main(String[] args) throws IOException, TimeoutException {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  7. Scanner scanner = new Scanner(System.in);
  8. while (scanner.hasNext()){
  9. String message = scanner.nextLine();
  10. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  11. System.out.println("消息发送完毕:" + message);
  12. }
  13. }
  14. }

 1.3、创建消费者

步骤:

①消费者连接RabbitMQ服务端,获取信道(即连接)

②信道调用basicConsume方法接收消息,其中参数表示:
1.消费哪个队列
2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
3.当一个消息发送过来后的回调接口,即接收到消息如何处理
4.消费者取消消费的回调

  1. public class Worker01 {
  2. //队列的名称
  3. public static final String QUEUE_NAME = "hello";
  4. public static void main(String[] args) throws IOException, TimeoutException {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. /**
  7. * 声明 接收消息
  8. */
  9. DeliverCallback deliverCallback = (consumeTag, message) ->{
  10. System.out.println("工作线程2接收到的消息:" + new String(message.getBody()));
  11. };
  12. /**
  13. 取消消费的回调
  14. */
  15. CancelCallback cancelCallback = (consumeTag) -> {
  16. System.out.println(consumeTag + "消费者取消消息消费接口回调逻辑");
  17. };
  18. System.out.println("T2等待接收消息......");
  19. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  20. }
  21. }

 二、轮训分发-生产者消费者代码

       轮训分发顾名思义就是生产者将消息发送到MQ当中时,由多个消费者来处理这些消息,消费者1一条,消费者2一条,以此类推,直到MQ当中的消息消费完。

2.1、生产者

此处采用消息持久化策略(添加MessageProperties.PERSISTENT_TEXT_PLAIN),即将队列和队列当中的信息持久化到磁盘当中。

  1. public class Task {
  2. /**
  3. * 队列名称
  4. */
  5. private static final String QUEUE_NAME = "ack_queue";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. boolean durable = true;//表示开启消息持久化
  9. channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
  10. Scanner scanner = new Scanner(System.in);
  11. while (scanner.hasNext()){
  12. String message = scanner.nextLine();
  13. //设置生产者发送的消息为持久化消息(要求保存到磁盘上)
  14. channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
  15. System.out.println("生产者发出消息:" + message);
  16. }
  17. }
  18. }

2.2、消费者1

此处将消费者1设置为手动应答,自动应答则是消费者一旦接收到消息就告知队列,队列则将该消息从队列中删除,手动应答是为了确保消息确确实实被处理掉了才告知队列进行删除,以防处理时出现问题导致消息未被完全处理就丢弃的情况。

  1. public class Consumer01 {
  2. /**
  3. * 队列名称
  4. */
  5. private static final String QUEUE_NAME = "ack_queue";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. DeliverCallback deliverCallback = (consumeTag, message) ->{
  9. System.out.println("C1接收到的消息:" + new String(message.getBody(),"UTF-8"));
  10. //手动应答
  11. /**
  12. * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
  13. * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
  14. */
  15. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  16. };
  17. //取消消费的回调
  18. CancelCallback cancelCallback = (consumeTag) -> {
  19. System.out.println("消息消费被中断");
  20. };
  21. /**
  22. * 采用手动应答
  23. */
  24. boolean autoAck = false;
  25. channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
  26. }
  27. }

2.3、消费者2

消费者2和消费者1一样采取手动应答,确保消息的正确处理。

  1. public class Consumer02 {
  2. /**
  3. * 队列名称
  4. */
  5. private static final String QUEUE_NAME = "ack_queue";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. DeliverCallback deliverCallback = (consumeTag, message) ->{
  9. System.out.println("C2接收到的消息:" + new String(message.getBody(),"UTF-8"));
  10. //手动应答
  11. /**
  12. * 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
  13. * 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
  14. */
  15. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  16. };
  17. //取消消费的回调
  18. CancelCallback cancelCallback = (consumeTag) -> {
  19. System.out.println("消息消费被中断");
  20. };
  21. /**
  22. * 采用手动应答
  23. */
  24. boolean autoAck = false;
  25. channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
  26. }
  27. }

 2.4、关于自动应答和手动应答

        自动应答可以保证高吞吐量,但是保证不了数据传输安全性。一方面,当采用自动应答时,队列发送完消息,当消息还未被消费者接收到时消费者一旦断开连接,此条消息则会丢失。另一方面,例如消费者端并没有设置传递消息的数量限制,队列持续的发消息给消费端,然而消费者端处理消息的能力低下,导致消息大量积压,最终导致内存耗尽,此时消费者线程会被操作系统杀死,那么这些消息依然没有被消费就丢失了。手动应答可以完美的规避自动应答的弊端,但是,手动应答的吞吐量并不高,因此采用哪种应答方式需要根据实际应用场景进行权衡。

消息应答的方法有两种:

Channel.basicAck(用于肯定确认,告知队列消息已被处理,可以删除了)
Channel.basicNack(用于否定确认,告知队列消息未被正常处理,需要重新发送该消息)
Channel.basicReject(用于否定确认,区别在于:告知队列不处理该消息了直接拒绝,可以将其丢弃了 )
此外,手动应答(basicAck)时的第二个参数Multiple的取值有两种,true和false:
true表示批量应答,即将信道上未被处理的消息一并应答。
false表示不采用批量应答,仅应答此条消息。

三、消息重新入队

参考二当中的代码,引入线程沉睡表示消息处理的效率,让消费者1处理一条消息仅需1s,消费者2处理一条消息需要30s,生产者发送AA\BB\CC\DD四条消息,按照上述逻辑,消费者1会打印AA\CC,消费者2对打印BB\DD,当生产者发送完DD时就将消费者2线程杀死。观察消费者1的打印台情况,可以观察到DD则由消费者1进行消费了,当MQ未收到消费者2对于DD的应答,DD这条消息依然存放在MQ当中,由于消费者2断开了连接,此时MQ就会将DD发送到消费者1的信道当中,由消费者1处理DD这条消息。

 生产者:

消费者1:

 消费者2:

 四、不公平分发

参考三当中的线程沉睡,我们类比成消费者1的处理能力高效,消费者2的处理能力低下,此处采用不公平分发,即能者多劳,让闲着的线程处理更多的消息。在消费者1和消费者2当中分别引入如下代码:

  1. /**
  2. * 设置不公平分发
  3. */
  4. int prefetchCount = 1;
  5. channel.basicQos(prefetchCount);

生产者发送AA\BB\CC\DD四条消息,消费者1和消费者2打印台执行效果和三当中的效果一样,唯一区别在于我们此时并没有强制杀死消费者2线程。

五、预取值

预取值是对不公平分发的进一步优化,所谓预取值,举个例子,此时队列当中有1\2\3\4\5\6\7\8,共8条消息,设置消费者1的预取值为5,消费者2的预取值为2,表示将这8条消息当中的5条发送给消费者1的信道当中,将2条发送给消费者2的信道当中,消费者1每应答一条消息,队列才会将下一条消息发往消费者1的信道,类似的,消费者2也是如此。这样做的目的在于假设我们可以预估到两个消费者的处理能力,根据他们的能力让他们处理对标能力的消息。预取值的大小我们一般是不会准确把握的,只有通过反复的测试才会把握到一个接近处理能力的预取值。观察两个消费者的打印台输出情况:

设置消费者1预取值及其打印台:

  1. /**
  2. * 预取值
  3. */
  4. int prefetchCount = 5;
  5. channel.basicQos(prefetchCount);

设置消费者2预取值及其打印台:

  1. /**
  2. * 预取值
  3. */
  4. int prefetchCount = 2;
  5. channel.basicQos(prefetchCount);

 六、发布确认模式

发布确认模式针对生产者,生产者将信道设置为confirm模式,此模式下,生产者发送的消息将需要RabbitMQ进行确认收到,防止生产者发送的消息丢失。发布确认模式有三种策略,分别为单个确认模式、批量确认模式、异步批量确认模式。针对以上三种策略,分别实现其生产者发送消息,观察其发送效率。

将三种模式封装成三个方法,并由主函数调用:

  1. /**
  2. * 队列名称
  3. */
  4. private static final String QUEUE_NAME = "confirm";
  5. /**
  6. * 批量发送消息条数
  7. */
  8. private static final int MESSAGE_COUNT = 1000;
  9. public static void main(String[] args) throws Exception{
  10. Channel channel = RabbitMqUtils.getChannel();
  11. channel.confirmSelect();//开启发布确认模式
  12. channel.queueDeclare(QUEUE_NAME,true,false,false,null);
  13. /*
  14. * 发布确认模式
  15. * 1、单个确认
  16. * 2、批量确认
  17. * 3、异步批量确认
  18. */
  19. publishMessageIndividually(channel);//25432
  20. publishMessageBatch(channel);//391
  21. asynchronousConfirmMessage(channel);//22
  22. }

6.1、单个确认模式

单个确认模式即生产者发送一条,MQ收到后回应一条,之后生产者继续发送下一条。

  1. //单个确认
  2. public static void publishMessageIndividually(Channel channel) throws Exception{
  3. //开始时间
  4. long startTime = System.currentTimeMillis();
  5. //批量的发消息 单个发布确认
  6. for (int i = 0; i < MESSAGE_COUNT; i++) {
  7. String message = i + "";
  8. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  9. //单个消息就立即进行发布确认
  10. boolean flg = channel.waitForConfirms();
  11. if (!flg){
  12. System.out.println("第" + i + "条消息发送失败~");
  13. }
  14. }
  15. //结束时间
  16. long endTime = System.currentTimeMillis();
  17. long spendTime = endTime - startTime;
  18. System.out.println("单个确认模式发布1000条消息耗时:" + spendTime + " ms");
  19. }

6.2、批量确认模式

批量确认模式即发送一批消息后一起确认是否收到,与上述单个确认模式相比要大大减少耗时,但是一旦某条消息未被收到,我们将很难排查到是哪条消息。

  1. //批量确认
  2. public static void publishMessageBatch(Channel channel) throws Exception{
  3. //开始时间
  4. long startTime = System.currentTimeMillis();
  5. //批量确认消息大小
  6. int confirmSize = 100;//每100条确认一次
  7. //批量的发送消息 批量发布确认
  8. for (int i = 0; i < MESSAGE_COUNT; i++) {
  9. String message = i + "";
  10. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  11. //判断达到100条消息的时候批量的确认一次
  12. if (((i + 1) % confirmSize) == 0){
  13. boolean flg = channel.waitForConfirms();
  14. if (!flg){
  15. System.out.println("第" + (i - 99) + "条~第" + i + "条持久化失败~" );
  16. }else {
  17. System.out.println("第" + (i - 99) + "条~第" + i + "条持久化成功~" );
  18. }
  19. }
  20. }
  21. //结束时间
  22. long endTime = System.currentTimeMillis();
  23. long spendTime = endTime - startTime;
  24. System.out.println("批量确认模式发布1000条消息耗时:" + spendTime + " ms");
  25. }

6.3、异步批量确认模式

异步批量确认模式完美的规避了上两种确认模式的弊端,异步批量确认模式中生产者只负责发送消息,而无需关心消息是否发送到队列当中并进行了持久化,由MQ当中的broker负责告知消息的发送结果,生产者只需要在代码当中准备一个监听器,监听broker的消息,broker告知监听器哪些消息发送成功了,哪些消息发送失败了,我们准备一个线程安全的数据容器即可,将发送的消息放到这个容器当中。监听器当中的参数ackCallback和nackCallback会分别对容器当中的数据进行处理。ackCallback有两手准备,multiple为true时表示批量应答,此时将容器当中Key小于该deliverTag的移到临时的容器当中,反之为false时,表示只应答一条,则从容器中移除该数据。

  1. //异步确认
  2. public static void asynchronousConfirmMessage(Channel channel) throws Exception{
  3. /**
  4. * 线程安全有序的哈希表 适用于高并发的情况下
  5. * 1.轻松的将序号和消息进行关联
  6. * 2.可以轻松的批量删除条目 只要给到序号
  7. * 3.支持高并发(多线程)
  8. */
  9. ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
  10. //消息确认成功回调函数
  11. /**
  12. * 1、消息的标记
  13. * 2、是否为批量确认
  14. */
  15. ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
  16. //删除已经确认的消息 剩下的就是未确认的消息
  17. if (multiple){
  18. ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
  19. confirmed.clear();
  20. }else {
  21. outstandingConfirms.remove(deliveryTag);
  22. }
  23. };
  24. //消息确认失败回调函数
  25. /**
  26. * 1、消息的标记
  27. * 2、是否为批量确认
  28. */
  29. ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
  30. String message = outstandingConfirms.get(deliveryTag);
  31. //3--->打印未确认的消息都有哪些
  32. System.out.println("未确认的消息:序号->" + deliveryTag + " 消息体:" + message);
  33. };
  34. //准备消息的监听器 监听哪些消息成功了 哪些消息失败了
  35. /**
  36. * 1、监听哪些消息成功了
  37. * 2、监听哪些消息失败了
  38. */
  39. channel.addConfirmListener(ackCallback,nackCallback);//异步通知
  40. //开始时间
  41. long startTime = System.currentTimeMillis();
  42. //批量发送消息
  43. for (int i = 0; i < MESSAGE_COUNT; i++) {
  44. String message = i + "";
  45. //此处记录所有要发送的消息
  46. outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
  47. channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
  48. }
  49. //结束时间
  50. long endTime = System.currentTimeMillis();
  51. long spendTime = endTime - startTime;
  52. System.out.println("异步确认模式发布1000条消息耗时:" + spendTime + " ms");
  53. }

七、交换机

7.1、Exchanges概念

RabbitMQ 消息传递模型的核心思想是 : 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机 (exchange) ,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

7.2、发布/订阅模式(fanout类型)

假设有这么一个场景,在网购系统当中,某家店铺需要将新产品进行推广,该产品面向中老年人退出,因此就需要将推广消息发送给该群体用户。中年群众和老年群众作为两种消费者群体,如何做到只发布一次消息就能做到这两种消费者都能收到该消息?此时,MQ就为我们提供了发布/订阅模式。

消息发送方:

  1. public class EmitLog {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "LOGS";
  6. /**
  7. * 交换机类型
  8. */
  9. private static final String EXCHANGE_TYPE = "fanout";
  10. public static void main(String[] args) throws Exception{
  11. Channel channel = RabbitMqUtils.getChannel();
  12. //声明一个交换机,交换机名称、交换机类型
  13. channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
  14. Scanner scanner = new Scanner(System.in);
  15. while (scanner.hasNext()){
  16. String message = scanner.nextLine();
  17. channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
  18. System.out.println("发送方发送消息:" + message);
  19. }
  20. }
  21. }

消费者1:

  1. public class ReceviceLogs01 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "LOGS";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明一个队列 临时队列
  9. /**
  10. * 生成一个临时队列 队列名是随机的
  11. * 当消费者与队列断开连接时 队列自动删除
  12. */
  13. String queueName = channel.queueDeclare().getQueue();
  14. /**
  15. * 绑定交换机与队列
  16. */
  17. channel.queueBind(queueName,EXCHANGE_NAME,"");
  18. System.out.println("接收方01等待接收消息......");
  19. /**
  20. * 声明 接收消息
  21. */
  22. DeliverCallback deliverCallback = (consumeTag, message) ->{
  23. System.out.println(new String(message.getBody()));
  24. };
  25. //取消消费的回调
  26. CancelCallback cancelCallback = (consumeTag) -> {
  27. System.out.println("消息消费被中断");
  28. };
  29. channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
  30. }
  31. }

消费者2:

  1. public class ReceviceLogs02 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "LOGS";
  6. /**
  7. * 交换机类型
  8. */
  9. private static final String EXCHANGE_TYPE = "fanout";
  10. public static void main(String[] args) throws Exception{
  11. Channel channel = RabbitMqUtils.getChannel();
  12. //声明一个队列 临时队列
  13. /**
  14. * 生成一个临时队列 队列名是随机的
  15. * 当消费者与队列断开连接时 队列自动删除
  16. */
  17. String queueName = channel.queueDeclare().getQueue();
  18. /**
  19. * 绑定交换机与队列
  20. */
  21. channel.queueBind(queueName,EXCHANGE_NAME,"");
  22. System.out.println("接收方02等待接收消息......");
  23. /**
  24. * 声明 接收消息
  25. */
  26. DeliverCallback deliverCallback = (consumeTag, message) ->{
  27. System.out.println(new String(message.getBody()));
  28. };
  29. //取消消费的回调
  30. CancelCallback cancelCallback = (consumeTag) -> {
  31. System.out.println("消息消费被中断");
  32. };
  33. channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
  34. }
  35. }

 执行结果:

 fanout交换机说明:此类型的交换机可以将接收到的消息发送给所有与它绑定的队列。参考我们上面所说的轮训分发,轮训分发是将队列当中的消息发送给消费者,一条消息仅能由一个消费者进行消费,如果使用默认的交换机,则需要发送多次消息,由队列再分发给每个消费者。

总结:只要该队列和此交换机绑定,无论routing-key是什么,都会接收到生产者发送的消息。

7.3、直接交换机(Direct类型)

假设有这么一个场景,同样是网购系统,在下单时我们需要将交易金额1W以上的订单交给某支付机构A处理,5000-1W的交给支付机构B处理,5000元以下的也交给支付机构A处理。怎么做?支付机构A和支付机构B分别对应消费者1和消费者2,订单为生产者,生产者(网购系统)将订单信息发送至交换机,交换机根据生产者提供的交易金额(routing-key)决定由哪个消费者(支付机构)去处理。这就是直接交换机的用处,可以指定哪个消费者去消费此条消息。

生产者:

  1. public class DirectLogs {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "direct_logs";
  6. /**
  7. * 交换机类型
  8. */
  9. public static final String EXCHANGE_TYPE = "direct";
  10. public static void main(String[] args) throws Exception{
  11. Channel channel = RabbitMqUtils.getChannel();
  12. //声明交换机
  13. channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
  14. Scanner scanner = new Scanner(System.in);
  15. while (scanner.hasNext()){
  16. String message = scanner.nextLine();
  17. channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));
  18. }
  19. }
  20. }

消费者1:

  1. public class ReceiveLogsDirect01 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "direct_logs";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明一个队列
  9. channel.queueDeclare("console",false,false,false,null);
  10. /**
  11. * 路由绑定
  12. */
  13. channel.queueBind("console",EXCHANGE_NAME,"info");
  14. channel.queueBind("console",EXCHANGE_NAME,"warning");
  15. /**
  16. * 声明 接收消息
  17. */
  18. DeliverCallback deliverCallback = (consumeTag, message) ->{
  19. System.out.println(new String(message.getBody()));
  20. };
  21. //取消消费的回调
  22. CancelCallback cancelCallback = (consumeTag) -> {
  23. System.out.println("消息消费被中断");
  24. };
  25. channel.basicConsume("console",true,deliverCallback,cancelCallback);
  26. }
  27. }

消费者2:

  1. public class ReceiveLogsDirect02 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "direct_logs";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明一个队列
  9. channel.queueDeclare("disk",false,false,false,null);
  10. /**
  11. * 路由绑定
  12. */
  13. channel.queueBind("disk",EXCHANGE_NAME,"error");
  14. /**
  15. * 声明 接收消息
  16. */
  17. DeliverCallback deliverCallback = (consumeTag, message) ->{
  18. System.out.println(new String(message.getBody()));
  19. };
  20. //取消消费的回调
  21. CancelCallback cancelCallback = (consumeTag) -> {
  22. System.out.println("消息消费被中断");
  23. };
  24. channel.basicConsume("disk",true,deliverCallback,cancelCallback);
  25. }
  26. }

 避坑:队列一旦与direct类型的交换机绑定,则生产者发送消息时则不能使用队列名称来指定发送到哪个队列,只能使用routing-key来进行队列选择。

7.4、Topic交换机

Topic交换机的应用场景是非广泛,fanout交换机和direct交换机的功能都可以使用Topic交换机实现。发送到topic交换机的消息不能具有任意的  routing_key —— 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。binding key也必须采用相同的形式。topic交换机背后的逻辑 类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:

1.*( 星号 ) 可以代替一个单词
2.#( 井号 ) 可以替代零个或多个单词

7.4.1、实用案例:

生产者:

  1. public class TopicExchangePro {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "topic_logs";
  6. /**
  7. * 交换机类型
  8. */
  9. private static final String EXCHANGE_TYPE = "topic";
  10. public static void main(String[] args) throws Exception{
  11. Channel channel = RabbitMqUtils.getChannel();
  12. /**
  13. * 声明一个交换机
  14. */
  15. channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
  16. Scanner scanner = new Scanner(System.in);
  17. while (scanner.hasNext()){
  18. String rountingKey = scanner.next();
  19. System.out.println("输入的rountingKey为:" + rountingKey);
  20. String message = scanner.next();
  21. System.out.println("输入消息体的为:" + message);
  22. channel.basicPublish(EXCHANGE_NAME,rountingKey,null,message.getBytes(StandardCharsets.UTF_8));
  23. System.out.println("消息体:" + message + "已发送给rountKey为:" + rountingKey + "的队列");
  24. }
  25. }
  26. }

消费者1:

  1. public class TopicExchangeCon01 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "topic_logs";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. /**
  9. * 声明一个队列
  10. */
  11. channel.queueDeclare("receive1",false,false,false,null);
  12. /**
  13. * 路由绑定
  14. */
  15. channel.queueBind("receive1",EXCHANGE_NAME,"*.*.pink");
  16. channel.queueBind("receive1",EXCHANGE_NAME,"green.#");
  17. /**
  18. * 声明 接收消息
  19. */
  20. DeliverCallback deliverCallback = (consumeTag, message) ->{
  21. String rountingKey = message.getEnvelope().getRoutingKey();
  22. System.out.println("receive1接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
  23. };
  24. //取消消费的回调
  25. CancelCallback cancelCallback = (consumeTag) -> {
  26. System.out.println("消息消费被中断");
  27. };
  28. channel.basicConsume("receive1",true,deliverCallback,cancelCallback);
  29. }
  30. }

消费者2:

  1. public class TopicExchangeCon02 {
  2. /**
  3. * 交换机名称
  4. */
  5. private static final String EXCHANGE_NAME = "topic_logs";
  6. public static void main(String[] args) throws Exception{
  7. Channel channel = RabbitMqUtils.getChannel();
  8. /**
  9. * 声明一个队列
  10. */
  11. channel.queueDeclare("receive2",false,false,false,null);
  12. /**
  13. * 路由绑定
  14. */
  15. channel.queueBind("receive2",EXCHANGE_NAME,"*.red.*");
  16. /**
  17. * 声明 接收消息
  18. */
  19. DeliverCallback deliverCallback = (consumeTag, message) ->{
  20. String rountingKey = message.getEnvelope().getRoutingKey();
  21. System.out.println("receive2接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
  22. };
  23. //取消消费的回调
  24. CancelCallback cancelCallback = (consumeTag) -> {
  25. System.out.println("消息消费被中断");
  26. };
  27. channel.basicConsume("receive2",true,deliverCallback,cancelCallback);
  28. }
  29. }

7.4.2、结果分析及测试:

消费者1routing-key:

  1. *.*.pink
  2. green.#

 两种匹配规则,路由key为(什么什么pink)可以发往该队列,路由key为(green什么什么什么...)也可以发往该队列。

消费者2routing-key:

*.red.*

匹配规则:路由key是(什么red什么)可以发往该队列。

测试用例:

  1. dog.is.pink 消费者1
  2. green.like.pink 消费者1
  3. green.red.dog 消费者1和消费者2
  4. green.cat 消费者1
  5. house.red.pink 消费者1和消费者2
  6. we.red.family 消费者2

测试结果:

 生产者:

 消费者1:

消费者2:

 避坑:当路由key匹配到同一个队列时(即路由key匹配到同一队列的多个routing-key时),只会给队列发送一次消息。

值得注意的是:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。如果队列绑定键当中没有#*出现,那么该队列绑定类型就是 direct

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/390900
推荐阅读
相关标签
  

闽ICP备14008679号