赞
踩
前言: 最近没事浏览Spring官网,简单写一些相关的笔记,这篇文章整理Spring AMQP相关内容。文章并不包含所有技术点,只是记录有收获
目录
Spring AMQP项目将Spring的核心概念应用到基于AMQP的消息传递解决方案的开发中。它提供了一个“模板”作为发送和接收消息的高级抽象。它还通过一个“监听器容器”为消息驱动的POJOs提供支持。这些库方便了AMQP资源的管理,同时促进了依赖注入和声明性配置的使用
Spring AMQP由两个模块组成, spring-amqp 和spring-rabbit , spring-amqp模块包含org.springframework.amqp.core核心包,这个包中可以找到AMQP模型的核心类,spring-rabbit包含RabbitMQ实现。
(图片来自网络)
0-9-1 AMQP 规范中没有定义消息类或者接口, 而 SpringAMQP定义了一个Message类作为消息,消息类作为一般AMQP域模型表示的一部分, Message类的目的是将主题和属性封装到单个实例中。 从而使API变得更简单。 MessageProperties对象定义了几个公共属性 。比如消息ID(messageId),时间戳(timestamp )等等。 可以通过setHeader方法设置,参考源码如下
- package org.springframework.amqp.core;
-
- //参考源码
- public class Message implements Serializable {
-
- private final byte[] body;
-
- public Message(byte[] body) {
- this(body, new MessageProperties());
- }
-
- //...
- }
交换器代码AMQP模型中的交换,他是消息的发送者。 代理虚拟主机中的每一个交换器都已一个唯一的名称和其他属性, 例如名称、类型、自动删除等
- package org.springframework.amqp.core;
-
- //参考源码
- public interface Exchange extends Declarable {
-
- String getName();
-
- String getType();
-
- boolean isDurable();
-
- boolean isAutoDelete();
-
- Map<String, Object> getArguments();
-
- boolean isDelayed();
-
- boolean isInternal();
-
- }
交换器类型是指交换器类型,包括“直连交换器”(direct),主题交换器(topic),扇出交换器(fanout),头部交换器(head).,以及支持自定义交换器(customExchange)
常用交换器如下
队列表示消息使用者从中接收消息的组件, 这也是AMQP模型的抽象表示,默认构造只接收构造名,同时也有其他构造的重载方法等,参考代码如下:
- package org.springframework.amqp.core;
-
- public class Queue extends AbstractDeclarable implements Cloneable {
-
- private final String name;
- private final boolean durable;
- private final boolean exclusive;
- private final boolean autoDelete;
- private volatile String actualName;
-
- /**
- *默认队列是持久的、非独占的和非自动删除的
- */
- public Queue(String name) {
- this(name, true, false, false);
- }
- //部分代码略
- }
假设,生产者发送消息到交换器,消费者从队列接收消息,那么将队列链接到交换的绑定对生产者和消费者有至关重要的作用。在SpringAMQP定义了Binding类来表示这个连接。
定义直连交换器,例如:
new Binding(someQueue, someDirectExchange, "foo.bar");
定义主体交换器,例如:
new Binding(someQueue, someTopicExchange, "foo.*");
定义扇出交换器,例如:
new Binding(someQueue, someFanoutExchange);
同时也可以通过BindingBuilder 来创建Binding,例如:
- Binding b = BindingBuilder.bind(someQueue)
- .to(someTopicExchange)
- .with("foo.*");
1.使用Spring AMQP 前需要先 安装 RabbitMQ broker服务
2.应用中添加Spring AMQP 依赖
Maven配置
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>2.4.7</version>
- </dependency>
Gradle配置
compile 'org.springframework.amqp:spring-rabbit:2.4.7'
- import org.springframework.amqp.core.AmqpAdmin;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
- /**
- * 示例1,
- */
- public class HelloWorldTest {
- public static void main(String[] args) {
-
- ConnectionFactory connectionFactory = new CachingConnectionFactory();
- //CachingConnectionFactory()默认为本地连接,所以上面代码和下面代码效果相同
- //ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
- AmqpAdmin admin = new RabbitAdmin(connectionFactory);
- admin.declareQueue(new Queue("myqueue"));
-
- AmqpTemplate template = new RabbitTemplate(connectionFactory);
- template.convertAndSend("myqueue", "Hello Word");
- String foo = (String) template.receiveAndConvert("myqueue");
-
- System.out.println("getMessage is :" + foo);
- }
- }
执行结果
- [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
- [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: SpringAMQP#5dd6264:0/SimpleConnection@61862a7f [delegate=amqp://guest@192.168.20.65:5672/, localPort= 55808]
- [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@192.168.20.65:5672/,1)
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitAdmin$$Lambda$36/0x00000008000c1c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.20.65:5672/,1), conn: Proxy@441772e Shared Rabbit Connection: SimpleConnection@61862a7f [delegate=amqp://guest@192.168.20.65:5672/, localPort= 55808]
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Queue 'myqueue'
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$43/0x000000080013f840 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.20.65:5672/,1), conn: Proxy@441772e Shared Rabbit Connection: SimpleConnection@61862a7f [delegate=amqp://guest@192.168.20.65:5672/, localPort= 55808]
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'Hello Word' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=10, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [myqueue]
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$44/0x000000080013f040 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.20.65:5672/,1), conn: Proxy@441772e Shared Rabbit Connection: SimpleConnection@61862a7f [delegate=amqp://guest@192.168.20.65:5672/, localPort= 55808]
- [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Received: (Body:'Hello Word' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myqueue, deliveryTag=1, messageCount=0])
- getMessage is :Hello Word
1)ConnectionFactory
在spring-rabbit模块中, ConnectionFactory是管理RabbitMQ代理连接的核心组件。SpringAMQP有三个工程可以选择。其中PooledChannelConnectionFactory和ThreadChannelConnectionFactory是2.3版本和之后版本才有。
PooledChannelConnectionFactory
对于大多数用例可使用。 一个非常简单的连接工厂,使用Apache Pool2 GenericObjectPools缓存通道(一个用于事务通道,一个用于非事务通道)。这些池具有默认配置,但可以使用回调进行配置
ThreadChannelConnectionFactory
如果希望确保严格的消息排序,而不需要使用 Scope操作时使用。该工厂管理单个链接和两个本地链接。 一个用于事务性通道,另一个用户非事务性通道。这个工厂确保同一线程上的所有操作都使用同一个通道。这有助于严格的消息排序,为了避免内存泄露,如果应用程序使用许多短期线程,必须调用closeThreadChannel方法释放资源。
CachingConnectionFactory
如果希望使用相关的发布确认,或者希望使用 CacheMode模式的时候可使用。现支持这些通道的缓存,并根据通道是否是事务性的来维护通道的单独缓存,创建示例时可以通过构造函数提供主机名。用户名和密码属性。配置通道默认缓存数量为25,可通过setChannelCacheSize方法设置缓存通道大小。CacheModel有两种模式,分别是CONNECTION和CHANNEL, 其中默认的CacheModel是CONNECTION. 可以通过setCacheModel修改
定义Bean文件
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/rabbit
- https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
- http://www.springframework.org/schema/beans
- https://www.springframework.org/schema/beans/spring-beans.xsd">
-
- <!-- 使用rabbit名称空间快速创建-->
- <rabbit:connection-factory id="connectionFactory"/>
-
- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
-
- <rabbit:admin connection-factory="connectionFactory"/>
-
- <rabbit:queue name="myqueue"/>
- </beans>
测试:
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.GenericXmlApplicationContext;
-
- public class XmlConfigurationTest {
-
- public static void main(String[] args) {
- ApplicationContext context =
- new GenericXmlApplicationContext("classpath:/rabbit-amqp.xml");
- AmqpTemplate template = context.getBean(AmqpTemplate.class);
- template.convertAndSend("myqueue", "With XML Configuration testing");
-
- AmqpTemplate template2 = context.getBean(AmqpTemplate.class);
- String foo = (String) template2.receiveAndConvert("myqueue");
- System.out.println("getMessage is :" + foo);
- }
- }
执行结果
- 11:51:10.669 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: star10001874:5672
- 11:51:10.738 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#4a83a74a:0/SimpleConnection@336f1079 [delegate=amqp://guest@169.254.40.49:5672/, localPort= 55766]
- 11:51:10.740 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
- 11:51:10.740 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - Initializing declarations
- 11:51:10.748 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@169.254.40.49:5672/,1)
- 11:51:10.764 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitAdmin$$Lambda$110/0x000000080023e840 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@169.254.40.49:5672/,1), conn: Proxy@2766ca9d Shared Rabbit Connection: SimpleConnection@336f1079 [delegate=amqp://guest@169.254.40.49:5672/, localPort= 55766]
- 11:51:10.764 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Queue 'myqueue'
- 11:51:10.767 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - Declarations finished
- 11:51:10.768 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/0x00000008001f4040 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@169.254.40.49:5672/,1), conn: Proxy@2766ca9d Shared Rabbit Connection: SimpleConnection@336f1079 [delegate=amqp://guest@169.254.40.49:5672/, localPort= 55766]
- 11:51:10.768 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'With XML Configuration testing' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=30, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [myqueue]
- 11:51:10.771 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$111/0x000000080023ec40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@169.254.40.49:5672/,1), conn: Proxy@2766ca9d Shared Rabbit Connection: SimpleConnection@336f1079 [delegate=amqp://guest@169.254.40.49:5672/, localPort= 55766]
- 11:51:10.774 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Received: (Body:'With XML Configuration testing' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myqueue, deliveryTag=1, messageCount=0])
- getMessage is :With XML Configuration testing
rabbit:connection-factory标签
factory支持配置其他的一些属性值,channel-cache-size默认配置值为25
- <bean id="connectionFactory"
- class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="127.0.0.1"/>
- <property name="username" value="guest"/>
- <property name="password" value="guest"/>
- <property name="channelCacheSize" value="50"/>
- </bean>
可以使用rabbit命名空间来简化配置,例如
- <rabbit:connection-factory id="connectionFactory" host="127.0.0.1"
- port="5672" username="guest" password="guest"
- requested-heartbeat="60" channel-cache-size="50"
- cache-model="CONNECTION" connection-timeout="6000" />
factory如果是集群的环境下可以配置多个,address , 例如下面的方式
- <rabbit:connection-factory
- id="connectionFactory" addresses="host1:5672,host2:5672"/>
rabbit:admin 标签
声明会会自动查找类型为Queue ,Exchange,Binging 的Bean 并声明 因此不需要在简单的Java驱动程序中显式使用该bean
定义配置类
- import org.springframework.amqp.core.AmqpAdmin;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Configurable;
- import org.springframework.context.annotation.Bean;
-
- @Configurable
- public class RabbitConfiguration {
-
- @Bean
- public ConnectionFactory connectionFactory() {
- return new CachingConnectionFactory("localhost");
- }
-
- @Bean
- public AmqpAdmin amqpAdmin() {
- return new RabbitAdmin(connectionFactory());
- }
-
- @Bean
- public RabbitTemplate rabbitTemplate() {
- return new RabbitTemplate(connectionFactory());
- }
-
- @Bean
- public Queue myQueue() {
- return new Queue("myqueue");
- }
- }
生产者发布消息
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-
- public class Producerest {
- public static void main(String[] args) {
- ApplicationContext context =
- new AnnotationConfigApplicationContext(RabbitConfiguration.class);
- AmqpTemplate template = context.getBean(AmqpTemplate.class);
- template.convertAndSend("myqueue", "With Java Configuration testing");
- }
- }
消费者处理消息
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-
- public class ConsumerTest {
-
- public static void main(String[] args) {
- ApplicationContext context =
- new AnnotationConfigApplicationContext(RabbitConfiguration.class);
- AmqpTemplate template2 = context.getBean(AmqpTemplate.class);
- String foo = (String) template2.receiveAndConvert("myqueue");
- System.out.println("getMessage is :" + foo);
- }
- }
执行结果
先运行Produceerest发送小心, 然后运行ConsumerTest接收小心,看看ConsumerTest执行日志
- 18:44:36.876 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
- 18:44:36.935 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: SpringAMQP#3e44f2a5:0/SimpleConnection@198d6542 [delegate=amqp://guest@192.168.20.65:5672/, localPort= 53682]
- 18:44:36.944 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@192.168.20.65:5672/,1)
- 18:44:36.962 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$119/0x00000008001a0440 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.20.65:5672/,1), conn: Proxy@5e403b4a Shared Rabbit Connection: SimpleConnection@198d6542 [delegate=amqp://guest@192.168.20.65:5672/, localPort= 53682]
- 18:44:36.981 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Received: (Body:'With Java Configuration testing' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myqueue, deliveryTag=1, messageCount=0])
- getMessage is :With Java Configuration testing
1) AmqpTemplate
Spring AMQP提供了一个扮演核心角色的“模板”,定义操作的接口是AmqpTemplate , 接口操作涵盖了发送和接收消息的一般行为 . 因此他包含了发送和接收消息的所有基本操作。后续文章在讨论
使用springboot时需要引入相应的starter
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
-
-
- @SpringBootApplication(scanBasePackages = {"com.amqp.spring.quicktour.setp4"})
- public class BootApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(BootApplication.class, args);
- }
-
- @Bean
- public ApplicationRunner runner(AmqpTemplate template) {
- return args -> template.convertAndSend("myqueue", "This is a springboot demo");
- }
-
- @Bean
- public Queue myQueue() {
- return new Queue("myqueue");
- }
-
- @RabbitListener(queues = "myqueue")
- public void listen(String in) {
- System.out.println("#############listen message:"+in);
- }
- }
执行结果
- . ____ _ __ _ _
- /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
- ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
- \\/ ___)| |_)| | | | | || (_| | ) ) ) )
- ' |____| .__|_| |_|_| |_\__, | / / / /
- =========|_|==============|___/=/_/_/_/
- :: Spring Boot :: (v2.6.2)
- 2022-11-01 18:49:24.682 INFO 11688 --- [ main] c.a.s.quicktour.setp4.BootApplication : Starting BootApplication using Java 11.0.2 on star10001874 with PID 11688 (G:\GITHUB\springamqp\target\classes started by 10001874 in G:\GITHUB\springamqp)
- 2022-11-01 18:49:24.685 INFO 11688 --- [ main] c.a.s.quicktour.setp4.BootApplication : No active profile set, falling back to default profiles: default
- 2022-11-01 18:49:25.336 INFO 11688 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
- 2022-11-01 18:49:25.343 INFO 11688 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
- 2022-11-01 18:49:25.343 INFO 11688 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.56]
- 2022-11-01 18:49:25.390 INFO 11688 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
- 2022-11-01 18:49:25.391 INFO 11688 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 661 ms
- 2022-11-01 18:49:25.845 INFO 11688 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
- 2022-11-01 18:49:25.847 INFO 11688 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
- 2022-11-01 18:49:25.865 INFO 11688 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6b6b3572:0/SimpleConnection@7741d346 [delegate=amqp://guest@192.168.20.65:5672/, localPort= 53853]
- 2022-11-01 18:49:25.902 INFO 11688 --- [ main] c.a.s.quicktour.setp4.BootApplication : Started BootApplication in 1.511 seconds (JVM running for 2.188)
- #############listen message:This is a springboot demo
1)ApplicationRunner接口
ApplicationRunner接口常用于项目启动后,也就是ApringApplication.run()执行结束后,立刻执行某些逻辑 ,示例中用次实现启动后执行向队列发送一个消息
2) @RabbitListener(queues = "myqueue")
@RabbitListener 注解是指定某方法作为消息消费的方法。示例中实现检测名称为myqueue 队列
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。