当前位置:   article > 正文

RabbitMq在Java中的应用_java 调用 rabbitmq

java 调用 rabbitmq

SpringBoot集成RabbitMq

一、RabbitMq的用途及作用

一、削峰、异步、解耦

经常开发的人都知道、RabbitMq常用于并发、流量大的场景,因为RabbitMq属于中间件需要维护,所以一般小项目几乎不会使用。而在于大型的并发环境下,大量的流量积压到接口中,使Mysql连接分配出现不够使用的情况,此时就可以使用RabbitMq来解决。

削峰:

当流量洪峰到达接口时,可以用现实中来举例子,mq就相当于一个独木桥,mysq就相当于河对岸,使大量的人从容有序的排队过河,而不会出现所有人全部淌水过河到河对岸,大大减少MySQL的压力。

异步:

通常采用异步通知的方式,就好比我们在抢票的时候,点击提交,系统会返回一个提示正在努力抢票中,而实际上你的订单正在mq队列中排队处理,处理结果会在后续异步通知结果。

解耦:

解耦主要两方面:

1.生产消息的应用 和 消费消息的应用不是同一种语言可以解耦

2.生产消息的应用 宕机,不会影响到消费者消费消息

二、RabbitMQ介绍


市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

消息队列之间的对比?

支持多语言:RabbitMQ,Kafaka ;ActiveMQ,RocketMQ只支持java

传输速度:RabbitMQ微秒级,其他毫秒级别

吞吐量:kafka 吞吐量和磁盘性能* 集群数量相关 次之RocketMQ

消息高可靠:每个一个都可以保证不丢失,不重复

三、RabbitMQ安装

我本机采用docker安装,比较简洁方便

第一步docker命令创建文件夹:mkdir rabbit

切换到刚刚创建的文件夹中:cd rabbit/

创建配置文件:vim docker-compose.yaml

将下面配置信息粘贴进去:

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq

按esc按键然后按下:输入wq保存并退出

启动rabbitmq:docker-compose up

游览器输入虚拟机地址+15672端口号访问RabbitMq可视化界面 默认用户名密码都是guest

四、Springboot整合RabbitMq

一 、分类
  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

    主要模式:

    Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。 Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。 Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。 Routing(路由模式):有选择的接收消息。 Topics (主题模式):通过一定的规则来选择性的接收消息 RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂

    交换机类型:

    direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。 fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。 topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。 headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。

导入依赖

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这里有两种方式整合RabbitMq

第一种采用其本身的框架 获取连接

最简模式
  1. package com.wwy.config;
  2. /**
  3. * @author 王伟羽
  4. * @date 2024/3/13 9:39
  5. */
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeoutException;
  10. /**
  11. * 配置获取RabbitMq的静态方法
  12. */
  13. public class RabbitMqUtils {
  14.   public static Connection getConnection() {
  15.       ConnectionFactory factory = new ConnectionFactory();   //创建连接工厂
  16.       //设置相关属性
  17.       factory.setUsername("guest");
  18.       factory.setPassword("guest");
  19.       factory.setVirtualHost("/");
  20.       factory.setHost("192.168.60.139");
  21.       factory.setPort(5672);
  22.       try {
  23.           //获取连接
  24.           Connection conn = factory.newConnection();
  25.           return conn;
  26.       } catch (IOException e) {
  27.           e.printStackTrace();
  28.           return null;
  29.       } catch (TimeoutException e) {
  30.           e.printStackTrace();
  31.           return null;
  32.       }
  33.   }
  34. }

创建生产者

  1. package com.wwy.producter;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.wwy.config.RabbitMqUtils;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.io.IOException;
  9. import java.util.Objects;
  10. /**
  11. * @author 王伟羽
  12. * @date 2024/3/13 9:52
  13. */
  14. @RestController
  15. @RequestMapping(value = "/test")
  16. public class producer {
  17.   private final static String QUERE_NAME = "quere_name";
  18.   @GetMapping(value = "/sendMessage")
  19.   public String sendMessage(String message) {
  20.       System.out.println(message);
  21.       Connection connection = RabbitMqUtils.getConnection();
  22.       if (Objects.nonNull(connection)) {
  23.           try {
  24.               Channel channel = connection.createChannel();
  25.               // 参数1:指定exchange,使用""。   最简模式(helloword) 使用默认交换机
  26.               // 参数2:指定路由的规则,
  27.               //       使用具体的队列名称。
  28.               //     参数2可以是队列名   也可以是路由规则
  29.               // 参数3:指定传递的消息所携带的properties,使用null。
  30.               // 参数4:指定发布的具体消息,byte[]类型
  31.               channel.basicPublish("", QUERE_NAME, null, "马上下课".getBytes("utf-8"));
  32.               return "发送消息成功!";
  33.           } catch (IOException e) {
  34.               e.printStackTrace();
  35.               return "发送消息失败!";
  36.           }
  37.       }
  38.       return "mq初始化失败!";
  39.   }
  40. }

消费者消费消息

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.*;
  3. import com.wwy.config.RabbitMqUtils;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Service;
  6. import org.springframework.util.CollectionUtils;
  7. import java.io.IOException;
  8. import java.util.Objects;
  9. /**
  10. * @author 王伟羽
  11. * @date 2024/3/13 10:33
  12. */
  13. @Service
  14. public class ConsumerTest {
  15.   private final static String QUERE_NAME = "quere_name";
  16.   @Bean
  17.   public void consumeMessage() {
  18.       Connection connection = RabbitMqUtils.getConnection();
  19.       if(Objects.nonNull(connection)){
  20.           try {
  21.               Channel channel = connection.createChannel();
  22.               channel.queueDeclare(QUERE_NAME,true,false,false,null);
  23.               // 第二步创建消费者
  24.               Consumer consumer = new DefaultConsumer(channel){
  25.                   @Override
  26.                   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27.                       // byte[] body 就是消费者得到的数据
  28.                       System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
  29.                   }
  30.               };
  31.               channel.basicConsume(QUERE_NAME,true,consumer);
  32.               // 让当前程序卡在 这里
  33.               System.in.read();
  34.           } catch (IOException e) {
  35.               e.printStackTrace();
  36.           }
  37.       }
  38.   }
  39. }

消费者获取消息的另一种方式(官网)

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.*;
  3. import com.wwy.config.RabbitMqUtils;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Service;
  6. import org.springframework.util.CollectionUtils;
  7. import java.io.IOException;
  8. import java.util.Objects;
  9. /**
  10. * @author 王伟羽
  11. * @date 2024/3/13 10:33
  12. */
  13. @Service
  14. public class ConsumerTest {
  15. private final static String QUERE_NAME = "quere_name";
  16. @Bean
  17. public void consumeMessage() {
  18. Connection connection = RabbitMqUtils.getConnection();
  19. if(Objects.nonNull(connection)){
  20. try {
  21. Channel channel = connection.createChannel();
  22. channel.queueDeclare(QUERE_NAME,true,false,false,null);
  23. // // 第二步创建消费者
  24. // Consumer consumer = new DefaultConsumer(channel){
  25. // @Override
  26. // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. //
  28. // // byte[] body 就是消费者得到的数据
  29. //
  30. // System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
  31. //
  32. // }
  33. // };
  34. // channel.basicConsume(QUERE_NAME,true,consumer);
  35. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  36. String message = new String(delivery.getBody(), "UTF-8");
  37. System.out.println(" [x] Received '" + message + "'");
  38. };
  39. channel.basicConsume(QUERE_NAME, true, deliverCallback, consumerTag -> { });
  40. // 让当前程序卡在 这里
  41. System.in.read();
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }
  47. }

启动程序访问接口

生产者生产成功

消费者收到生产者的消息

使用封装好的RabbitTemplate进行操作,比较方便快捷

第一步配置信息

  1. server:
  2. port: 8083
  3. spring:
  4. rabbitmq: # 单机版配置
  5. host: 192.168.60.139
  6. port: 5672
  7. username: guest #账户名密码默认都是guest
  8. password: guest
  9. publisher-confirm-type: simple
  10. publisher-returns: true
  11. listener:
  12. simple:
  13. acknowledge-mode: manual

生产者:

  1. package com.wwy.producter;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.wwy.config.RabbitMqUtils;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import javax.annotation.Resource;
  10. /**
  11. * @author 王伟羽
  12. * @date 2024/3/13 9:52
  13. */
  14. @RestController
  15. @RequestMapping(value = "/test")
  16. public class producer {
  17. private final static String QUERE_NAME = "quere_name";
  18. @Resource
  19. private RabbitTemplate rabbitTemplate;
  20. @GetMapping(value = "/sendMessage")
  21. public String sendMessage(String message) {
  22. System.out.println(message);
  23. rabbitTemplate.convertAndSend(QUERE_NAME,message);
  24. return "发送成功!";
  25. }
  26. }

消费者:

  1. package com.wwy.consumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author 王伟羽
  6. * @date 2024/3/13 10:33
  7. */
  8. @Component
  9. public class ConsumerTest {
  10. private final static String QUERE_NAME = "quere_name";
  11. @RabbitListener(queues = QUERE_NAME)
  12. public void handleMessage(String msg) {
  13. System.out.println("listener 收到消息4 " + msg);
  14. }
  15. }

运行

工作队列模式

生产者:

  1. package com.wwy.producter;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.MessageProperties;
  5. import com.wwy.config.RabbitMqUtils;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.util.CollectionUtils;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import javax.annotation.Resource;
  12. import java.io.IOException;
  13. import java.util.Objects;
  14. /**
  15. * @author 王伟羽
  16. * @date 2024/3/13 13:53
  17. */
  18. @RestController
  19. @RequestMapping(value = "/workQueues")
  20. public class WorkQueuesProducer {
  21. private final static String WORK_QUEUES = "work_queues";
  22. @Resource
  23. private RabbitTemplate rabbitTemplate;
  24. //第一种
  25. @GetMapping(value = "/workQueuesSendMessage")
  26. public String workQueuesSendMessage(String message) {
  27. System.out.println("接收消息");
  28. //第一种生产消息方法
  29. //获取连接
  30. Connection connection = RabbitMqUtils.getConnection();
  31. if (Objects.nonNull(connection)) {
  32. try {
  33. Channel channel = connection.createChannel();
  34. //发送消息
  35. for (int i = 0; i < 10; i++) {
  36. channel.basicPublish("", WORK_QUEUES, null, (message + i).getBytes("utf-8"));
  37. }
  38. return "发送成功!";
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. return "发送失败!";
  42. }
  43. }
  44. return "初始mq失败!";
  45. }
  46. //第二种
  47. @GetMapping(value = "/workQueuesSendMessage01")
  48. public String workQueuesSendMessage01(String message) {
  49. System.out.println("接收消息");
  50. //发送消息
  51. for (int i = 0; i < 10; i++) {
  52. System.out.println(i);
  53. rabbitTemplate.convertAndSend(WORK_QUEUES, message);
  54. }
  55. return "发送成功!";
  56. }
  57. }

消费者消费消息

第一种:

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import com.wwy.config.RabbitMqUtils;
  6. import org.springframework.boot.SpringBootConfiguration;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.core.annotation.Order;
  11. import org.springframework.stereotype.Service;
  12. import javax.annotation.Resource;
  13. import java.io.IOException;
  14. import java.util.Objects;
  15. /**
  16. * @author 王伟羽
  17. * @date 2024/3/13 14:13
  18. */
  19. @Service
  20. public class WorkQueuesConsumerOne {
  21. private final static String WORK_QUEUES = "work_queues";
  22. @Bean
  23. public void getMessageInfoOne() {
  24. Connection connection = RabbitMqUtils.getConnection();
  25. if (Objects.nonNull(connection)) {
  26. try {
  27. Channel channel = connection.createChannel();
  28. channel.queueDeclare(WORK_QUEUES, true, false, false, null);
  29. //设置每次消费消息的数量
  30. channel.basicQos(1);
  31. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  32. String message = new String(delivery.getBody(), "UTF-8");
  33. System.out.println(" [x] 队列1消息内容 '" + message + "'");
  34. System.out.println("队列1获取到消息");
  35. //手动ACK
  36. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  37. };
  38. channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
  39. System.out.println("队列1消息消费被中断");
  40. });
  41. } catch (IOException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  46. @Bean
  47. public void getMessageInfoTwo() {
  48. Connection connection = RabbitMqUtils.getConnection();
  49. if (Objects.nonNull(connection)) {
  50. try {
  51. Channel channel = connection.createChannel();
  52. channel.queueDeclare(WORK_QUEUES, true, false, false, null);
  53. //设置每次消费消息的数量
  54. channel.basicQos(1);
  55. System.out.println("队列2获取到消息");
  56. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  57. String message = new String(delivery.getBody(), "UTF-8");
  58. System.out.println(" [x] 队列2消息内容 '" + message + "'");
  59. //手动ACK
  60. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  61. };
  62. channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
  63. System.out.println("队列2消息消费被中断");
  64. });
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

第二种注解方式:

  1. package com.wwy.consumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.Resource;
  6. /**
  7. * @author 王伟羽
  8. * @date 2024/3/13 15:10
  9. */
  10. @Component
  11. public class WorkQueuesConsumersOne {
  12. public final static String WORK_QUEUES = "work_queues";
  13. @RabbitListener(queues = WORK_QUEUES)
  14. public void consumerOne(String message) {
  15. System.out.println("队列1收到消息"+message);
  16. }
  17. @RabbitListener(queues = WORK_QUEUES)
  18. public void consumerTwo(String message) {
  19. System.out.println("队列2收到消息"+message);
  20. }
  21. }

发布/订阅模式

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

第一种生产者方式:

  1. package com.wwy.producter;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.wwy.config.RabbitMqUtils;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.io.IOException;
  10. import java.util.Objects;
  11. /**
  12. * @author 王伟羽
  13. * @date 2024/3/13 16:54
  14. */
  15. @RestController
  16. @RequestMapping(value = "/publishProducer")
  17. public class PublishProducer {
  18. private final static String PUBLISH_PRODUCER1 = "publish_name_one";
  19. private final static String PUBLISH_PRODUCER2 = "publish_name_two";
  20. //交换机名称
  21. private final static String EXCHANGE_NAME = "publish_exchange";
  22. @GetMapping(value = "/publishSendMessage")
  23. public String publishSendMessage(String message) {
  24. Connection connection = RabbitMqUtils.getConnection();
  25. if (Objects.nonNull(connection)) {
  26. Channel channel = null;
  27. try {
  28. channel = connection.createChannel();
  29. // 绑定交换机
  30. //参数1: exchange的名称
  31. //参数2: 指定exchange的类型
  32. // FANOUT - pubsub , 发布订阅
  33. // DIRECT - Routing , 路由模式
  34. // TOPIC - Topics topic
  35. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  36. //给交换机绑定对应的队列
  37. //将队列和交换机绑定
  38. //String var1, 队列名
  39. //String var2, 交换机名
  40. //String var3, 对应绑定队列 路由规则 "" 没有规则所有的队列消息一样
  41. channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
  42. channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
  43. // 参数1:指定exchange,使用""。 最简模式(helloword) 使用默认交换机
  44. // 参数2:指定路由的规则,
  45. // 使用具体的队列名称。
  46. // 参数2可以是队列名 也可以是路由规则
  47. // 参数3:指定传递的消息所携带的properties,使用null。
  48. // 参数4:指定发布的具体消息,byte[]类型
  49. for (int i = 1; i < 11; i++) {
  50. // 消息向交换机发送,没有匹配路由规则
  51. channel.basicPublish(EXCHANGE_NAME, PUBLISH_PRODUCER1, null, (message + i).getBytes("utf-8"));
  52. }
  53. return "发送成功!";
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. return "mq出错!";
  59. }
  60. }

消费者消费信息:

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import com.wwy.config.RabbitMqUtils;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. import java.util.Objects;
  10. /**
  11. * @author 王伟羽
  12. * @date 2024/3/13 17:06
  13. */
  14. @Component
  15. public class PublishConsumerOne {
  16. private final static String PUBLISH_PRODUCER1 = "publish_name_one";
  17. private final static String PUBLISH_PRODUCER2 = "publish_name_two";
  18. //交换机名称
  19. private final static String EXCHANGE_NAME = "publish_exchange";
  20. @Bean
  21. public void publishGetInfo() {
  22. Connection connection = RabbitMqUtils.getConnection();
  23. if (Objects.nonNull(connection)) {
  24. try {
  25. Channel channel = connection.createChannel();
  26. //保证消费者 每次只消费一条消息
  27. channel.basicQos(1);
  28. // 第一步声明 队列
  29. channel.queueDeclare(PUBLISH_PRODUCER1,true,false,false,null);
  30. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  31. channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
  32. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  33. String message = new String(delivery.getBody(), "UTF-8");
  34. System.out.println(" 队列一消费消息" + message);
  35. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  36. };
  37. channel.basicConsume(PUBLISH_PRODUCER1, true, deliverCallback, consumerTag -> {
  38. });
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. @Bean
  45. public void publishGetInfoTwo() {
  46. Connection connection = RabbitMqUtils.getConnection();
  47. if (Objects.nonNull(connection)) {
  48. try {
  49. Channel channel = connection.createChannel();
  50. //保证消费者 每次只消费一条消息
  51. channel.basicQos(1);
  52. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  53. channel.queueDeclare(PUBLISH_PRODUCER2,true,false,false,null);
  54. channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
  55. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  56. String message = new String(delivery.getBody(), "UTF-8");
  57. System.out.println(" 队列二消费消息" + message);
  58. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  59. };
  60. channel.basicConsume(PUBLISH_PRODUCER2, true, deliverCallback, consumerTag -> {
  61. });
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. }

运行结果可知,两个消费者都能获取到信息,此种情况适合用户注册业务,一个队列接收短信发送,一个队列接收邮件发送

第二种简便方式  生产者

  1. @GetMapping(value = "/PublicSubscribe")
  2. public void PublicSubscribe() {
  3. rabbitTemplate.convertAndSend("publish_exchange_one", "", "发布订阅模式", new CorrelationData("我是大帅逼"));
  4. }

消费者:

  1. package com.wwy.consumer;
  2. import lombok.extern.apachecommons.CommonsLog;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.boot.SpringBootConfiguration;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author 王伟羽
  8. * @date 2024/3/13 17:49
  9. */
  10. @Component
  11. public class PublishConsumerTwo {
  12. @RabbitListener(queues = "publish_queue_one")
  13. public void publishOne(String message) {
  14. System.out.println("队列1收到的消息" + message);
  15. }
  16. @RabbitListener(queues = "publish_queue_two")
  17. public void publishTwo(String message) {
  18. System.out.println("队列2收到的消息" + message);
  19. }
  20. }
主题模式

由于上述第一种方法太过于繁琐,所以主题模式只采用第二种方法,第一种在后续代码里展示

配置主题模式下的队列、交换机并将其绑定起来

  1. package com.wwy.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.boot.SpringBootConfiguration;
  4. import org.springframework.context.annotation.Bean;
  5. /**
  6. * @author 王伟羽
  7. * @date 2024/3/14 9:29
  8. */
  9. @SpringBootConfiguration
  10. public class TopicConfiguration {
  11. @Bean
  12. public TopicExchange getTopicExchange(){
  13. return new TopicExchange("topic_exchange_one");
  14. }
  15. @Bean
  16. public Queue getTopicQueueOne(){
  17. return new Queue("topic_queue_one");
  18. }
  19. @Bean
  20. public Queue getTopicQueueTwo(){
  21. return new Queue("topic_queue_two");
  22. }
  23. @Bean
  24. public Queue getTopicQueueThree(){
  25. return new Queue("topic_queue_three");
  26. }
  27. //* 代表一个词
  28. //# 代表零个或者多个词
  29. @Bean
  30. public Binding getTopicBindingOne(){
  31. return BindingBuilder.bind(getTopicQueueOne()).to(getTopicExchange()).with("a.*");
  32. }
  33. @Bean
  34. public Binding getTopicBindingThree(){
  35. return BindingBuilder.bind(getTopicQueueThree()).to(getTopicExchange()).with("a.#");
  36. }
  37. @Bean
  38. public Binding getTopicBindingTwo(){
  39. return BindingBuilder.bind(getTopicQueueTwo()).to(getTopicExchange()).with("a.111");
  40. }
  41. }

创建生产者:

  1. package com.wwy.producter;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import javax.annotation.Resource;
  7. import java.io.UnsupportedEncodingException;
  8. /**
  9. * @author 王伟羽
  10. * @date 2024/3/14 9:36
  11. */
  12. @RestController
  13. @RequestMapping(value = "/topic")
  14. public class TopicProducer {
  15. @Resource
  16. private RabbitTemplate rabbitTemplate;
  17. @GetMapping(value = "/sendTopicMessage")
  18. public String sendTopicMessage(String message){
  19. try {
  20. rabbitTemplate.convertAndSend("topic_exchange_one","a.123",message.getBytes("utf-8"));
  21. return "生产者发送消息成功";
  22. } catch (UnsupportedEncodingException e) {
  23. e.printStackTrace();
  24. return "发送消息失败!";
  25. }
  26. }
  27. }

创建消费者:

  1. package com.wwy.consumer;
  2. import lombok.extern.apachecommons.CommonsLog;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author 王伟羽
  7. * @date 2024/3/14 9:45
  8. */
  9. @Component
  10. public class TopicConsumerOne {
  11. @RabbitListener(queues = "topic_queue_one")
  12. public void getTopicMessageOne(String message) {
  13. System.out.println("队列一收到消息:" + message);
  14. }
  15. @RabbitListener(queues = "topic_queue_two")
  16. public void getTopicMessageTwo(String message) {
  17. System.out.println("队列二收到消息:" + message);
  18. }
  19. @RabbitListener(queues = "topic_queue_three")
  20. public void getTopicMessageThree(String message) {
  21. System.out.println("队列三收到消息:" + message);
  22. }
  23. }

这里有一个问题,在每次消费端重启的时候会继续消费队列里的数据,为了防止这种情况,可以消费者在消费到数据的时候进行手动ack

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.apachecommons.CommonsLog;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.support.AmqpHeaders;
  7. import org.springframework.messaging.handler.annotation.Header;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. /**
  12. * @author 王伟羽
  13. * @date 2024/3/14 9:45
  14. */
  15. @Component
  16. public class TopicConsumerOne {
  17. @RabbitListener(queues = "topic_queue_one")
  18. public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  19. try {
  20. // 获取消息内容
  21. String messageBody = new String(message.getBody());
  22. System.out.println("队列一收到消息:"+messageBody);
  23. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  24. channel.basicAck(deliveryTag, false);
  25. System.out.println("队列一收到ack");
  26. } catch (Exception e) {
  27. // 如果处理消息时出现异常,可以拒绝消息
  28. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  29. if (!channel.isOpen()) {
  30. // 如果channel已经关闭,则无法执行basicNack或basicReject
  31. return;
  32. }
  33. try {
  34. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  35. } catch (IOException ex) {
  36. ex.printStackTrace();
  37. }
  38. // 或者可以选择 basicReject 如果不需要重新放回队列
  39. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  40. }
  41. }
  42. @RabbitListener(queues = "topic_queue_two")
  43. public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  44. try {
  45. // 获取消息内容
  46. String messageBody = new String(message.getBody());
  47. System.out.println("队列二收到消息:"+messageBody);
  48. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  49. channel.basicAck(deliveryTag, false);
  50. System.out.println("队列二收到ack");
  51. } catch (Exception e) {
  52. // 如果处理消息时出现异常,可以拒绝消息
  53. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  54. if (!channel.isOpen()) {
  55. // 如果channel已经关闭,则无法执行basicNack或basicReject
  56. return;
  57. }
  58. try {
  59. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  60. } catch (IOException ex) {
  61. ex.printStackTrace();
  62. }
  63. // 或者可以选择 basicReject 如果不需要重新放回队列
  64. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  65. }
  66. }
  67. @RabbitListener(queues = "topic_queue_three")
  68. public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  69. try {
  70. // 获取消息内容
  71. String messageBody = new String(message.getBody());
  72. System.out.println("队列三收到消息:"+messageBody);
  73. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  74. channel.basicAck(deliveryTag, false);
  75. System.out.println("队列三收到ack");
  76. } catch (Exception e) {
  77. // 如果处理消息时出现异常,可以拒绝消息
  78. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  79. if (!channel.isOpen()) {
  80. // 如果channel已经关闭,则无法执行basicNack或basicReject
  81. return;
  82. }
  83. try {
  84. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  85. } catch (IOException ex) {
  86. ex.printStackTrace();
  87. }
  88. // 或者可以选择 basicReject 如果不需要重新放回队列
  89. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  90. }
  91. }
  92. }
路由模式

路由模式几乎与主题模式相同,也是通过key去发送到对应的消费者中去

配置队列,交换机并将其绑定到一起

  1. package com.wwy.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.boot.SpringBootConfiguration;
  4. import org.springframework.context.annotation.Bean;
  5. /**
  6. * @author 王伟羽
  7. * @date 2024/3/14 10:49
  8. */
  9. @SpringBootConfiguration
  10. public class RouterConfiguration {
  11. @Bean
  12. public DirectExchange getRouterExchange(){
  13. return new DirectExchange("router_exchange_one");
  14. }
  15. @Bean
  16. public Queue getRouterQueueOne(){
  17. return new Queue("router_queue_one");
  18. }
  19. @Bean
  20. public Queue getRouterQueueTwo(){
  21. return new Queue("router_queue_two");
  22. }
  23. @Bean
  24. public Queue getRouterQueueThree(){
  25. return new Queue("router_queue_three");
  26. }
  27. @Bean
  28. public Binding getRouterBindingOne(){
  29. return BindingBuilder.bind(getRouterQueueOne()).to(getRouterExchange()).with("aaa");
  30. }
  31. @Bean
  32. public Binding getRouterBindingThree(){
  33. return BindingBuilder.bind(getRouterQueueThree()).to(getRouterExchange()).with("bbb");
  34. }
  35. @Bean
  36. public Binding getRouterBindingTwo(){
  37. return BindingBuilder.bind(getRouterQueueTwo()).to(getRouterExchange()).with("ccc");
  38. }
  39. }

生产者:

  1. package com.wwy.producter;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import javax.annotation.Resource;
  7. import java.io.UnsupportedEncodingException;
  8. /**
  9. * @author 王伟羽
  10. * @date 2024/3/14 10:48
  11. */
  12. @RestController
  13. @RequestMapping(value = "/router")
  14. public class RouterProducer {
  15. @Resource
  16. private RabbitTemplate rabbitTemplate;
  17. @GetMapping(value = "/sendRouterMessage")
  18. public String sendTopicMessage(String message){
  19. try {
  20. rabbitTemplate.convertAndSend("router_exchange_one","aaa",message.getBytes("utf-8"));
  21. return "生产者发送消息成功";
  22. } catch (UnsupportedEncodingException e) {
  23. e.printStackTrace();
  24. return "发送消息失败!";
  25. }
  26. }
  27. }

消费者:

  1. package com.wwy.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.messaging.handler.annotation.Payload;
  8. import org.springframework.stereotype.Component;
  9. import java.io.IOException;
  10. /**
  11. * @author 王伟羽
  12. * @date 2024/3/14 10:57
  13. */
  14. @Component
  15. public class RouterConsumer {
  16. @RabbitListener(queues = "router_queue_one")
  17. public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  18. try {
  19. // 获取消息内容
  20. String messageBody = new String(message.getBody());
  21. System.out.println("队列一收到消息:"+messageBody);
  22. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  23. channel.basicAck(deliveryTag, false);
  24. System.out.println("队列一收到ack");
  25. } catch (Exception e) {
  26. // 如果处理消息时出现异常,可以拒绝消息
  27. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  28. if (!channel.isOpen()) {
  29. // 如果channel已经关闭,则无法执行basicNack或basicReject
  30. return;
  31. }
  32. try {
  33. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  34. } catch (IOException ex) {
  35. ex.printStackTrace();
  36. }
  37. // 或者可以选择 basicReject 如果不需要重新放回队列
  38. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  39. }
  40. }
  41. @RabbitListener(queues = "router_queue_two")
  42. public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  43. try {
  44. // 获取消息内容
  45. String messageBody = new String(message.getBody());
  46. System.out.println("队列二收到消息:"+messageBody);
  47. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  48. channel.basicAck(deliveryTag, false);
  49. System.out.println("队列二收到ack");
  50. } catch (Exception e) {
  51. // 如果处理消息时出现异常,可以拒绝消息
  52. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  53. if (!channel.isOpen()) {
  54. // 如果channel已经关闭,则无法执行basicNack或basicReject
  55. return;
  56. }
  57. try {
  58. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  59. } catch (IOException ex) {
  60. ex.printStackTrace();
  61. }
  62. // 或者可以选择 basicReject 如果不需要重新放回队列
  63. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  64. }
  65. }
  66. @RabbitListener(queues = "router_queue_three")
  67. public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
  68. try {
  69. // 获取消息内容
  70. String messageBody = new String(message.getBody());
  71. System.out.println("队列三收到消息:"+messageBody);
  72. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  73. channel.basicAck(deliveryTag, false);
  74. System.out.println("队列三收到ack");
  75. } catch (Exception e) {
  76. // 如果处理消息时出现异常,可以拒绝消息
  77. Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
  78. if (!channel.isOpen()) {
  79. // 如果channel已经关闭,则无法执行basicNack或basicReject
  80. return;
  81. }
  82. try {
  83. channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
  84. } catch (IOException ex) {
  85. ex.printStackTrace();
  86. }
  87. // 或者可以选择 basicReject 如果不需要重新放回队列
  88. // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
  89. }
  90. }
  91. }

这里生产者发送消息指定了key为aaa的,所以只有消费者一匹配并接收到消息

总结

以上就是本次测试的所以队列名,大家可以在测试的时候进入可视化界面查看消息、队列、交换机状态。本文只是简单的对RabbitMq的各种模式进行简单了解,后续的如何在项目中实现、死信队列等在下章博客分享,对于本篇如有错误的地方欢迎大家指正。

代码

wangweiyuyu/rabbitmq - 码云 - 开源中国 (gitee.com)icon-default.png?t=N7T8https://gitee.com/wangweiyuyu/rabbitmq

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/641449
推荐阅读
相关标签
  

闽ICP备14008679号