当前位置:   article > 正文

RabbitMQ在SpringBoot中的应用_springboot中使用rabbitmq

springboot中使用rabbitmq

目录

一、基本概念

二、RabbitMQ安装

设置ip

1 镜像

1.1 在线下载

1.2 使用离线镜像包

2 关闭防火墙

3 配置管理员用户信息

4 启动

5 访问控制台

三、SpringBoot中RabbitMQ常用的几种模式

1 简单模式

主程序

生产者

消费者

2 工作模式

主程序

生产者

消费者

3 发布订阅模式

主程序

生产者

消费者

4 路由模式

主程序

生产者

消费者

5 主题模式

主程序

生产者

消费者

6 RPC异步调用

主程序

服务端

客户端

测试类 

ACK确认模式

设置 ack 模式

手动执行确认操作

抓取数量


一、基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。

常用消息队列有:Rabbitmq、Activemq、Rocketmq 阿里、Kafka、Tubemq 腾讯

应用场景:服务解耦、流量削峰、异步调用。

1、Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。

ExchangeType常用的有direct、Fanout和Topic三种。ExchangeType决定了Exchange路由消息的行为。

2、Message Queue:消息队列,存储消息,等待消费者来取。

3、Binding Key:Exchange与Message Queue是通过binding key进行联系的。

4、Routing Key:指定这个消息的路由规则。我们的生产者只需要通过指定routing key来决定消息流向哪里。

二、RabbitMQ安装

设置ip

进入rabbitmq 虚拟机后依次执行下面指令,更改并检查ip信息

  1. ./ip-static //执行脚本
  2. ip: 192.168.64.140 //设置ip
  3. ifconfig //查看ip是否设置成功

***提前装置好docker容器,代码段中指令每条分开依次执行***

1 镜像

1.1 在线下载

  1. //rabbitmq:management 镜像中已经安装了管理界面
  2. docker pull rabbitmq:management

1.2 使用离线镜像包

将离线安装包上传到Linux系统,使用docker指令导入镜像

docker load -i rabbit-image.gz

2 关闭防火墙

  1. systemctl stop firewalld
  2. systemctl disable firewalld
  3. # 重启 docker 系统服务
  4. systemctl restart docker

3 配置管理员用户信息

创建rabbitmq文件夹,添加rabbitmq.conf配置文件,配置用户名和密码。

  1. mkdir /etc/rabbitmq
  2. vim /etc/rabbitmq/rabbitmq.conf
  3. # 添加两行配置:
  4. default_user = admin
  5. default_pass = admin

4 启动

  1. docker run -d --name rabbit \
  2. -p 5672:5672 \
  3. -p 15672:15672 \
  4. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
  5. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
  6. --restart=always \
  7. rabbitmq:management

5 访问控制台

http://192.168.64.140:15672

主要端口介绍

  • 4369 – erlang发现口
  • 5672 – client端通信口
  • 15672 – 管理界面ui端口
  • 25672 – server间内部通信口

三、SpringBoot中RabbitMQ常用的几种模式

测试前先启动虚拟机,如果没有设置开机自启的话,在进入容器把RabbitMQ启动。

启动后访问控制台:http://192.168.64.140:15672 进行查看。

测试方法都写在主程序中,没有单独写测试类。

API:

Spring提供的Queue类,是队列的封装对象,它封装了队列的参数信息。

AmqpTemplate是rabbitmq客户端API的一个封装工具,提供了简便的方法来执行消息操作。

 添加依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.amqp</groupId>
  13. <artifactId>spring-rabbit-test</artifactId>
  14. <scope>test</scope>
  15. </dependency>
  16. </dependencies>

 application.yml配置:

  1. spring:
  2. rabbitmq:
  3. host: 192.168.64.140
  4. username: admin
  5. password: admin

1 简单模式

只有一个消费者

主程序

1、创建queue实例,将对象交给spring容器管理(注意queue的导包路径);

2、引入Produer对象。

3、构建test测试方法(没有写测试类,利用spring执行流程,使用@PostConstruct注解启用测试方法),执行send()方法。

spring执行流程:包扫描创建所有实例-->依赖注入-->@PostConstruct-->执行后续流程-->...

  1. import org.springframework.amqp.core.Queue;//queue实例包路径
  2. import javax.annotation.PostConstruct;
  3. @SpringBootApplication
  4. public class Main {
  5. public static void main(String[] args) { SpringApplication.run(Main.class,args);}
  6. @Bean
  7. public Queue helloworld(){
  8. return new Queue("helloworld",false,false,false);
  9. }
  10. @Autowired
  11. private Produer produer;
  12. @PostConstruct//执行一些初始化操作,需要初始化的资源可以在此方法里做
  13. public void test(){
  14. produer.send();
  15. }
  16. }

生产者

注入AmqpTemplate对象。

AmqpTemplate由自动配置类自动创建,他是rabbitmq客户端API的一个封装工具,提供了简便的方法来执行消息操作。

  1. @Component
  2. public class Produer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. //自定义的消息发送方法
  6. //消息会自动转换成byte[],不用手动转换
  7. public void send(){
  8. amqpTemplate.convertAndSend("helloworld","Hello World!");
  9. }
  10. }

消费者

接收消息方式一:

 通过@RabbitListener从指定的队列接收消息

使用@RebbitHandler注解的方法来处理消息

  1. @Component
  2. @RabbitListener(queues = "helloworld")
  3. public class SimpleReceiver {
  4. @RabbitHandler
  5. public void receive(String msg) {
  6. System.out.println("收到: "+msg);
  7. }
  8. }

接收消息方式二: 

  1. @Component
  2. public class Consumer {
  3. @RabbitListener(queues = "helloworld")
  4. public void receive(String s){
  5. System.out.println("收到:"+s);
  6. }
  7. }

接收消息方式三:

@RabbitListener 注解中也可以直接定义队列 

	@RabbitListener(queuesToDeclare = @Queue(name = "helloworld",durable = "false"))

2 工作模式

多个消费者,从同一个队列接收消息

负载均衡:消息会轮询发送给所有消费者

合理的分发消息:1.手动ACK

主程序

1、创建名为task_queue的持久队列

2、测试方法新建一个线程,执行死循环不阻塞spring主线程运行

  1. @SpringBootApplication
  2. public class Main {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Main.class,args);
  5. }
  6. @Bean
  7. public Queue taskQueue(){
  8. return new Queue("task_queue",true,false,false);//默认持久队列,第一个参数可以不写
  9. }
  10. @Autowired
  11. private Produer produer;
  12. @PostConstruct
  13. public void test(){
  14. //在新的线程中执行死循环,不阻塞 spring 主线程执行
  15. new Thread(()->produer.send()).start();
  16. }
  17. }

生产者

  1. @Component
  2. public class Produer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(){
  6. while (true){
  7. System.out.println("输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. amqpTemplate.convertAndSend("task_queue",s);
  10. }
  11. }
  12. }

spring boot封装的 rabbitmq api 中, 发送的消息默认是持久化消息.
如果希望发送非持久化消息, 需要在发送消息时做以下设置:

使用 MessagePostProcessor 前置处理器参数

从消息中获取消息的属性对象

在属性中把 DeliveryMode 设置为非持久化

  1. //如果需要设置消息为非持久化,可以取得消息的属性对象,修改它的deliveryMode属性
  2. amqpTemplate.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() {
  3. @Override
  4. public Message postProcessMessage(Message message) throws AmqpException {
  5. MessageProperties props = message.getMessageProperties();
  6. props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
  7. return message;
  8. }
  9. });

消费者

  1. @Component
  2. public class Consumer {
  3. /*
  4. 每个@RabbitListener 注解都会注册成为一个消费者
  5. */
  6. @RabbitListener(queues = "task_queue")
  7. public void receive1(String s){
  8. System.out.println("消费者1收到:"+s);
  9. }
  10. @RabbitListener(queues = "task_queue")
  11. public void receive2(String s){
  12. System.out.println("消费者2收到:"+s);
  13. }
  14. }

3 发布订阅模式

把消息群发给所有消费者,同一条消息所有消费者都能收到,使用fanout类型交换机

消息合理分发:

1.autoACK = false 手动确认 springboot 封装的api,默认就是手动确认模式,springboot会自动发送回执。

2. qos = 1 每次只收一条数据 yml 添加 pre-fetch = 1 (默认250)。

持久化:

1.队列持久化 :new Queue(对列名,true)不写true默认就是true

2.消息数据的持久化:     

AmqpTemplate.convertAndSent() 方法发送的消息,默认就是持久消息。      AmqpTemplate.convertAndSent(队列,消息,消息预处理对象) 消息预处理对象中,可以获取消息属性,把持久化属性改成非持久,可以发送非持久消息。

主程序

创建 FanoutExcnahge 实例, 封装 fanout 类型交换机定义信息。

  1. @SpringBootApplication
  2. public class Main {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Main.class,args);
  5. }
  6. @Bean
  7. public FanoutExchange logs(){
  8. return new FanoutExchange("logs",false,false);
  9. }
  10. @Autowired
  11. private Produer produer;
  12. @PostConstruct
  13. public void test(){
  14. new Thread(()->produer.send()).start();
  15. }
  16. }

生产者

生产者向指定的交换机 logs 发送数据。

不需要指定队列名或路由键, 即使指定也无效, 因为 fanout 交换机会向所有绑定的队列发送数据, 而不是有选择的发送。

  1. @Component
  2. public class Produer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(){
  6. while (true){
  7. System.out.println("输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. amqpTemplate.convertAndSend("logs","",s);
  10. }
  11. }
  12. }

消费者

消费者需要执行以下操作:

  1. 定义随机队列(随机命名,非持久,排他,自动删除)
  2. 定义交换机(可以省略, 已在主程序中定义)
  3. 将队列绑定到交换机

spring boot 通过注解完成以上操作:

  1. @Component
  2. public class Consumer {
  3. /*
  4. 每个@RabbitListener 注解都会注册成为一个消费者
  5. */
  6. @RabbitListener(bindings = @QueueBinding(
  7. value = @Queue,//自动设置的参数:随机命名,false,true,true
  8. exchange = @Exchange(name = "logs",declare = "false")//declare = "false" 表示不创建交换机,只是引用存在的交换机
  9. ))
  10. public void receive1(String s){
  11. System.out.println("消费者1收到:"+s);
  12. }
  13. @RabbitListener(bindings = @QueueBinding(
  14. value = @Queue,//自动设置的参数:随机命名,false,true,true
  15. exchange = @Exchange(name = "logs",declare = "false")//declare = "false" 表示不创建交换机,只是引用存在的交换机
  16. ))
  17. public void receive2(String s){
  18. System.out.println("消费者2收到:"+s);
  19. }
  20. }

4 路由模式

通过关键词匹配,来确定吧消息发送到哪个队列,direct交换机实现这种路由规则。

与发布和订阅模式代码类似, 只是做以下三点调整:

  1. 使用 direct 交换机
  2. 队列和交换机绑定时, 设置绑定键
  3. 发送消息时, 指定路由键

主程序

使用 DirectExcnahge 对象封装交换机信息。

  1. @SpringBootApplication
  2. public class Main {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Main.class,args);
  5. }
  6. @Bean
  7. public DirectExchange logs(){
  8. return new DirectExchange("direct_logs",false,false);
  9. }
  10. @Autowired
  11. private Produer produer;
  12. @PostConstruct
  13. public void test(){
  14. new Thread(()->produer.send()).start();
  15. }
  16. }

生产者

生产者向指定的交换机发送消息, 并指定路由键.

  1. @Component
  2. public class Produer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(){
  6. while (true){
  7. System.out.println("输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. System.out.println("输入关键词:");
  10. String k = new Scanner(System.in).nextLine();
  11. amqpTemplate.convertAndSend("direct_logs",k,s);
  12. }
  13. }
  14. }

消费者

消费者通过注解来定义随机队列, 绑定到交换机, 并指定绑定键:

  1. public class Consumer {
  2. @RabbitListener(bindings = @QueueBinding(
  3. value = @Queue,
  4. exchange = @Exchange(name = "direct_logs",declare = "false"),
  5. key = {"error"}
  6. ))
  7. public void receive1(String s){
  8. System.out.println("消费者1收到:"+s);
  9. }
  10. @RabbitListener(bindings = @QueueBinding(
  11. value = @Queue,
  12. exchange = @Exchange(name = "direct_logs",declare = "false"),/
  13. key = {"info","error","warning"}
  14. ))
  15. public void receive2(String s){
  16. System.out.println("消费者2收到:"+s);
  17. }
  18. }

5 主题模式

和路由模式相同,具有特殊的关键词规则,Topic交换机实现这种特殊路由规则。

 代码与路由模式基本相同, 只做如下调整:

  1. 使用 topic 交换机
  2. 使用特殊的绑定键和路由键规则

主程序

使用TopicExchange对象封装交换机信息。

  1. @SpringBootApplication
  2. public class Main {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Main.class,args);
  5. }
  6. @Bean
  7. public TopicExchange logs(){
  8. return new TopicExchange("topic_logs",false,false);
  9. }
  10. @Bean
  11. public Queue rndQueue(){
  12. return new Queue(UUID.randomUUID().toString(),false,true,true);
  13. }
  14. @Autowired
  15. private Produer produer;
  16. @PostConstruct
  17. public void test(){
  18. new Thread(()->produer.send()).start();
  19. }
  20. }

生产者

  1. @Component
  2. public class Produer {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send(){
  6. while (true){
  7. System.out.println("输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. System.out.println("输入关键词:");
  10. String k = new Scanner(System.in).nextLine();
  11. amqpTemplate.convertAndSend("topic_logs",k,s);
  12. }
  13. }
  14. }

消费者

#{}和${}的区别:

#{}--- SPEL Spring Expression Language                    可以直接访问spring容器中的对象
${}--- QGNL Object Graph Navigation Language          Struts2 中提供的一种表达式语言

  1. @Component
  2. public class Consumer {
  3. @RabbitListener(bindings = @QueueBinding(
  4. value = @Queue(name = "#{rndQueue.name}",declare = "false"),//自动设置的参数:随机命名,false,true,true
  5. exchange = @Exchange(name = "topic_logs",declare = "false"), //declare = "false" 表示不创建交换机,只是引用存在的交换机
  6. key = {"*.orange.*"}
  7. ))
  8. public void receive1(String s){
  9. System.out.println("消费者1收到:"+s);
  10. }
  11. @RabbitListener(bindings = @QueueBinding(
  12. value = @Queue,
  13. exchange = @Exchange(name = "topic_logs",declare = "false"),
  14. key = {"*.*.rabbit","lazy.#"}
  15. ))
  16. public void receive2(String s){
  17. System.out.println("消费者2收到:"+s);
  18. }
  19. }

6 RPC异步调用

两个队列,调用队列,返回队列

主程序

主程序中定义两个队列

  • 发送调用信息的队列: rpc_queue
  • 返回结果的队列: 随机命名
  1. import java.util.UUID;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. @SpringBootApplication
  7. public class Main {
  8. public static void main(String[] args) {
  9. SpringApplication.run(Main.class, args);
  10. }
  11. @Bean
  12. public Queue sendQueue() {
  13. return new Queue("rpc_queue",false);
  14. }
  15. @Bean
  16. public Queue rndQueue() {
  17. return new Queue(UUID.randomUUID().toString(), false);
  18. }
  19. }

服务端

从rpc_queue接收调用数据, 执行运算求斐波那契数,并返回计算结果.
@Rabbitlistener注解对于具有返回值的方法:

会自动获取 replyTo 属性
自动获取 correlationId 属性
向 replyTo 属性指定的队列发送计算结果, 并携带 correlationId 属性

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class RpcServer {
  5. @RabbitListener(queues = "rpc_queue")
  6. public long getFbnq(int n) {
  7. return f(n);
  8. }
  9. private long f(int n) {
  10. if (n==1 || n==2) {
  11. return 1;
  12. }
  13. return f(n-1) + f(n-2);
  14. }
  15. }

客户端

使用 SPEL 表达式获取随机队列名: "#{rndQueue.name}"

发送调用数据时, 携带随机队列名和correlationId

从随机队列接收调用结果, 并获取correlationId

  1. import java.util.UUID;
  2. import org.springframework.amqp.AmqpException;
  3. import org.springframework.amqp.core.AmqpTemplate;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.support.AmqpHeaders;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.beans.factory.annotation.Value;
  11. import org.springframework.messaging.handler.annotation.Header;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class RpcClient {
  15. @Autowired
  16. AmqpTemplate t;
  17. @Value("#{rndQueue.name}")
  18. String rndQueue;
  19. public void send(int n) {
  20. // 发送调用信息时, 通过前置消息处理器, 对消息属性进行设置, 添加返回队列名和关联id
  21. t.convertAndSend("rpc_queue", (Object)n, new MessagePostProcessor() {
  22. @Override
  23. public Message postProcessMessage(Message message) throws AmqpException {
  24. MessageProperties p = message.getMessageProperties();
  25. p.setReplyTo(rndQueue);
  26. p.setCorrelationId(UUID.randomUUID().toString());
  27. return message;
  28. }
  29. });
  30. }
  31. //从随机队列接收计算结果
  32. @RabbitListener(queues = "#{rndQueue.name}")
  33. public void receive(long r, @Header(name=AmqpHeaders.CORRELATION_ID) String correlationId) {
  34. System.out.println("\n\n"+correlationId+" - 收到: "+r);
  35. }
  36. }

测试类 

  1. import java.util.Scanner;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. @SpringBootTest
  6. class TopicTests {
  7. @Autowired
  8. RpcClient client;
  9. @Test
  10. void test1() throws Exception {
  11. while (true) {
  12. System.out.print("求第几个斐波那契数: ");
  13. int n = new Scanner(System.in).nextInt();
  14. client.send(n);
  15. }
  16. }
  17. }

ACK确认模式

在 spring boot 中提供了三种确认模式:

NONE - 使用rabbitmq的自动确认
AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
默认的 AUTO 模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送。

设置 ack 模式

application.yml配置:

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. # acknowledgeMode: NONE # rabbitmq的自动确认
  6. acknowledgeMode: AUTO # rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
  7. # acknowledgeMode: MANUAL # rabbitmq的手动确认, springboot不发送回执, 必须自己编码发送回执

手动执行确认操作

 如果设置为 MANUAL 模式,必须手动执行确认操作

  1. @RabbitListener(queues="task_queue")
  2. public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
  3. System.out.println("receiver1 - 收到: "+s);
  4. for (int i = 0; i < s.length(); i++) {
  5. if (s.charAt(i) == '.') {
  6. Thread.sleep(1000);
  7. }
  8. }
  9. // 手动发送确认回执
  10. c.basicAck(tag, false);
  11. }

抓取数量

工作模式中, 为了合理地分发数据, 需要将 qos 设置成 1, 每次只接收一条消息, 处理完成后才接收下一条消息.

spring boot 中是通过 prefetch 属性进行设置, 改属性的默认值是 250.

application.yml配置:

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. prefetch: 1 # qos=1, 默认250

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/958595
推荐阅读
相关标签
  

闽ICP备14008679号