赞
踩
目录
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。
常用消息队列有:Rabbitmq、Activemq、Rocketmq 阿里、Kafka、Tubemq 腾讯
应用场景:服务解耦、流量削峰、异步调用。
1、Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。
ExchangeType常用的有direct、Fanout和Topic三种。ExchangeType决定了Exchange路由消息的行为。
2、Message Queue:消息队列,存储消息,等待消费者来取。
3、Binding Key:Exchange与Message Queue是通过binding key进行联系的。
4、Routing Key:指定这个消息的路由规则。我们的生产者只需要通过指定routing key来决定消息流向哪里。
进入rabbitmq 虚拟机后依次执行下面指令,更改并检查ip信息
- ./ip-static //执行脚本
- ip: 192.168.64.140 //设置ip
- ifconfig //查看ip是否设置成功
***提前装置好docker容器,代码段中指令每条分开依次执行***
- //rabbitmq:management 镜像中已经安装了管理界面
- docker pull rabbitmq:management
将离线安装包上传到Linux系统,使用docker指令导入镜像
docker load -i rabbit-image.gz
- systemctl stop firewalld
- systemctl disable firewalld
-
- # 重启 docker 系统服务
- systemctl restart docker
创建rabbitmq文件夹,添加rabbitmq.conf配置文件,配置用户名和密码。
- mkdir /etc/rabbitmq
- vim /etc/rabbitmq/rabbitmq.conf
-
- # 添加两行配置:
- default_user = admin
- default_pass = admin
- docker run -d --name rabbit \
- -p 5672:5672 \
- -p 15672:15672 \
- -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
- -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
- --restart=always \
- rabbitmq:management
-
http://192.168.64.140:15672
主要端口介绍
测试前先启动虚拟机,如果没有设置开机自启的话,在进入容器把RabbitMQ启动。
启动后访问控制台:http://192.168.64.140:15672 进行查看。
测试方法都写在主程序中,没有单独写测试类。
API:
Spring提供的Queue类,是队列的封装对象,它封装了队列的参数信息。
AmqpTemplate是rabbitmq客户端API的一个封装工具,提供了简便的方法来执行消息操作。
添加依赖:
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
application.yml配置:
- spring:
- rabbitmq:
- host: 192.168.64.140
- username: admin
- password: admin
只有一个消费者
1、创建queue实例,将对象交给spring容器管理(注意queue的导包路径);
2、引入Produer对象。
3、构建test测试方法(没有写测试类,利用spring执行流程,使用@PostConstruct注解启用测试方法),执行send()方法。
spring执行流程:包扫描创建所有实例-->依赖注入-->@PostConstruct-->执行后续流程-->...
- import org.springframework.amqp.core.Queue;//queue实例包路径
-
- import javax.annotation.PostConstruct;
-
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) { SpringApplication.run(Main.class,args);}
- @Bean
- public Queue helloworld(){
- return new Queue("helloworld",false,false,false);
- }
- @Autowired
- private Produer produer;
- @PostConstruct//执行一些初始化操作,需要初始化的资源可以在此方法里做
- public void test(){
- produer.send();
- }
- }
注入AmqpTemplate对象。
AmqpTemplate由自动配置类自动创建,他是rabbitmq客户端API的一个封装工具,提供了简便的方法来执行消息操作。
-
- @Component
- public class Produer {
- @Autowired
- private AmqpTemplate amqpTemplate;
- //自定义的消息发送方法
- //消息会自动转换成byte[],不用手动转换
- public void send(){
- amqpTemplate.convertAndSend("helloworld","Hello World!");
- }
- }
接收消息方式一:
通过@RabbitListener
从指定的队列接收消息
使用@RebbitHandler
注解的方法来处理消息
- @Component
- @RabbitListener(queues = "helloworld")
- public class SimpleReceiver {
- @RabbitHandler
- public void receive(String msg) {
- System.out.println("收到: "+msg);
- }
- }
接收消息方式二:
- @Component
- public class Consumer {
- @RabbitListener(queues = "helloworld")
- public void receive(String s){
- System.out.println("收到:"+s);
- }
- }
接收消息方式三:
@RabbitListener
注解中也可以直接定义队列
@RabbitListener(queuesToDeclare = @Queue(name = "helloworld",durable = "false"))
多个消费者,从同一个队列接收消息
负载均衡:消息会轮询发送给所有消费者
合理的分发消息:1.手动ACK
1、创建名为task_queue的持久队列
2、测试方法新建一个线程,执行死循环不阻塞spring主线程运行
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class,args);
- }
- @Bean
- public Queue taskQueue(){
- return new Queue("task_queue",true,false,false);//默认持久队列,第一个参数可以不写
- }
- @Autowired
- private Produer produer;
- @PostConstruct
- public void test(){
- //在新的线程中执行死循环,不阻塞 spring 主线程执行
- new Thread(()->produer.send()).start();
- }
- }
- @Component
- public class Produer {
- @Autowired
- private AmqpTemplate amqpTemplate;
- public void send(){
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- amqpTemplate.convertAndSend("task_queue",s);
- }
- }
- }
spring boot封装的 rabbitmq api 中, 发送的消息默认是持久化消息.
如果希望发送非持久化消息, 需要在发送消息时做以下设置:
使用 MessagePostProcessor 前置处理器参数
从消息中获取消息的属性对象
在属性中把 DeliveryMode 设置为非持久化
- //如果需要设置消息为非持久化,可以取得消息的属性对象,修改它的deliveryMode属性
- amqpTemplate.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- MessageProperties props = message.getMessageProperties();
- props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- return message;
- }
- });
- @Component
- public class Consumer {
- /*
- 每个@RabbitListener 注解都会注册成为一个消费者
- */
- @RabbitListener(queues = "task_queue")
- public void receive1(String s){
- System.out.println("消费者1收到:"+s);
- }
- @RabbitListener(queues = "task_queue")
- public void receive2(String s){
- System.out.println("消费者2收到:"+s);
- }
- }
把消息群发给所有消费者,同一条消息所有消费者都能收到,使用fanout类型交换机
消息合理分发:
1.autoACK = false 手动确认 springboot 封装的api,默认就是手动确认模式,springboot会自动发送回执。
2. qos = 1 每次只收一条数据 yml 添加 pre-fetch = 1 (默认250)。
持久化:
1.队列持久化 :new Queue(对列名,true)不写true默认就是true
2.消息数据的持久化:
AmqpTemplate.convertAndSent() 方法发送的消息,默认就是持久消息。 AmqpTemplate.convertAndSent(队列,消息,消息预处理对象) 消息预处理对象中,可以获取消息属性,把持久化属性改成非持久,可以发送非持久消息。
创建 FanoutExcnahge
实例, 封装 fanout
类型交换机定义信息。
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class,args);
- }
- @Bean
- public FanoutExchange logs(){
- return new FanoutExchange("logs",false,false);
- }
- @Autowired
- private Produer produer;
- @PostConstruct
- public void test(){
- new Thread(()->produer.send()).start();
- }
- }
生产者向指定的交换机 logs
发送数据。
不需要指定队列名或路由键, 即使指定也无效, 因为 fanout
交换机会向所有绑定的队列发送数据, 而不是有选择的发送。
- @Component
- public class Produer {
- @Autowired
- private AmqpTemplate amqpTemplate;
- public void send(){
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- amqpTemplate.convertAndSend("logs","",s);
- }
- }
- }
消费者需要执行以下操作:
spring boot 通过注解完成以上操作:
- @Component
- public class Consumer {
- /*
- 每个@RabbitListener 注解都会注册成为一个消费者
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue,//自动设置的参数:随机命名,false,true,true
- exchange = @Exchange(name = "logs",declare = "false")//declare = "false" 表示不创建交换机,只是引用存在的交换机
- ))
- public void receive1(String s){
- System.out.println("消费者1收到:"+s);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue,//自动设置的参数:随机命名,false,true,true
- exchange = @Exchange(name = "logs",declare = "false")//declare = "false" 表示不创建交换机,只是引用存在的交换机
- ))
- public void receive2(String s){
- System.out.println("消费者2收到:"+s);
- }
- }
通过关键词匹配,来确定吧消息发送到哪个队列,direct交换机实现这种路由规则。
与发布和订阅模式代码类似, 只是做以下三点调整:
direct
交换机使用 DirectExcnahge
对象封装交换机信息。
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class,args);
- }
- @Bean
- public DirectExchange logs(){
- return new DirectExchange("direct_logs",false,false);
- }
- @Autowired
- private Produer produer;
- @PostConstruct
- public void test(){
- new Thread(()->produer.send()).start();
- }
- }
生产者向指定的交换机发送消息, 并指定路由键.
- @Component
- public class Produer {
- @Autowired
- private AmqpTemplate amqpTemplate;
- public void send(){
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- System.out.println("输入关键词:");
- String k = new Scanner(System.in).nextLine();
- amqpTemplate.convertAndSend("direct_logs",k,s);
- }
- }
- }
消费者通过注解来定义随机队列, 绑定到交换机, 并指定绑定键:
- public class Consumer {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue,
- exchange = @Exchange(name = "direct_logs",declare = "false"),
- key = {"error"}
- ))
- public void receive1(String s){
- System.out.println("消费者1收到:"+s);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue,
- exchange = @Exchange(name = "direct_logs",declare = "false"),/
- key = {"info","error","warning"}
- ))
- public void receive2(String s){
- System.out.println("消费者2收到:"+s);
- }
-
- }
和路由模式相同,具有特殊的关键词规则,Topic交换机实现这种特殊路由规则。
代码与路由模式基本相同, 只做如下调整:
topic
交换机使用TopicExchange对象封装交换机信息。
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class,args);
- }
- @Bean
- public TopicExchange logs(){
- return new TopicExchange("topic_logs",false,false);
- }
- @Bean
- public Queue rndQueue(){
- return new Queue(UUID.randomUUID().toString(),false,true,true);
- }
- @Autowired
- private Produer produer;
- @PostConstruct
- public void test(){
- new Thread(()->produer.send()).start();
- }
- }
- @Component
- public class Produer {
- @Autowired
- private AmqpTemplate amqpTemplate;
- public void send(){
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- System.out.println("输入关键词:");
- String k = new Scanner(System.in).nextLine();
- amqpTemplate.convertAndSend("topic_logs",k,s);
- }
- }
- }
#{}和${}的区别:
#{}--- SPEL Spring Expression Language 可以直接访问spring容器中的对象
${}--- QGNL Object Graph Navigation Language Struts2 中提供的一种表达式语言
- @Component
- public class Consumer {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "#{rndQueue.name}",declare = "false"),//自动设置的参数:随机命名,false,true,true
- exchange = @Exchange(name = "topic_logs",declare = "false"), //declare = "false" 表示不创建交换机,只是引用存在的交换机
- key = {"*.orange.*"}
- ))
- public void receive1(String s){
- System.out.println("消费者1收到:"+s);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue,
- exchange = @Exchange(name = "topic_logs",declare = "false"),
- key = {"*.*.rabbit","lazy.#"}
- ))
- public void receive2(String s){
- System.out.println("消费者2收到:"+s);
- }
- }
两个队列,调用队列,返回队列
主程序中定义两个队列
rpc_queue
- import java.util.UUID;
- import org.springframework.amqp.core.Queue;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
-
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class, args);
- }
- @Bean
- public Queue sendQueue() {
- return new Queue("rpc_queue",false);
- }
- @Bean
- public Queue rndQueue() {
- return new Queue(UUID.randomUUID().toString(), false);
- }
- }
从rpc_queue接收调用数据, 执行运算求斐波那契数,并返回计算结果.
@Rabbitlistener注解对于具有返回值的方法:
会自动获取 replyTo 属性
自动获取 correlationId 属性
向 replyTo 属性指定的队列发送计算结果, 并携带 correlationId 属性
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class RpcServer {
- @RabbitListener(queues = "rpc_queue")
- public long getFbnq(int n) {
- return f(n);
- }
- private long f(int n) {
- if (n==1 || n==2) {
- return 1;
- }
- return f(n-1) + f(n-2);
- }
- }
使用 SPEL 表达式获取随机队列名: "#{rndQueue.name}"
发送调用数据时, 携带随机队列名和correlationId
从随机队列接收调用结果, 并获取correlationId
- import java.util.UUID;
-
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- @Component
- public class RpcClient {
- @Autowired
- AmqpTemplate t;
-
- @Value("#{rndQueue.name}")
- String rndQueue;
-
- public void send(int n) {
- // 发送调用信息时, 通过前置消息处理器, 对消息属性进行设置, 添加返回队列名和关联id
- t.convertAndSend("rpc_queue", (Object)n, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- MessageProperties p = message.getMessageProperties();
- p.setReplyTo(rndQueue);
- p.setCorrelationId(UUID.randomUUID().toString());
- return message;
- }
- });
- }
-
- //从随机队列接收计算结果
- @RabbitListener(queues = "#{rndQueue.name}")
- public void receive(long r, @Header(name=AmqpHeaders.CORRELATION_ID) String correlationId) {
- System.out.println("\n\n"+correlationId+" - 收到: "+r);
- }
-
- }
- import java.util.Scanner;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class TopicTests {
- @Autowired
- RpcClient client;
- @Test
- void test1() throws Exception {
- while (true) {
- System.out.print("求第几个斐波那契数: ");
- int n = new Scanner(System.in).nextInt();
- client.send(n);
- }
- }
-
- }
在 spring boot 中提供了三种确认模式:
NONE - 使用rabbitmq的自动确认
AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
默认的 AUTO 模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送。
application.yml配置:
- spring:
- rabbitmq:
- listener:
- simple:
- # acknowledgeMode: NONE # rabbitmq的自动确认
- acknowledgeMode: AUTO # rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
- # acknowledgeMode: MANUAL # rabbitmq的手动确认, springboot不发送回执, 必须自己编码发送回执
如果设置为 MANUAL
模式,必须手动执行确认操作
- @RabbitListener(queues="task_queue")
- public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
- System.out.println("receiver1 - 收到: "+s);
- for (int i = 0; i < s.length(); i++) {
- if (s.charAt(i) == '.') {
- Thread.sleep(1000);
- }
- }
- // 手动发送确认回执
- c.basicAck(tag, false);
- }
工作模式中, 为了合理地分发数据, 需要将 qos 设置成 1, 每次只接收一条消息, 处理完成后才接收下一条消息.
spring boot 中是通过 prefetch
属性进行设置, 改属性的默认值是 250.
application.yml配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1 # qos=1, 默认250
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。