赞
踩
本文将会详细介绍Spring和RabbitMQ整合,其中主要介绍路由模式(Routing)其他模式大体差不多就不一一介绍
项目git地址:https://github.com/gitcaiqing/SpringRabbitMQ
1.项目结构
2.创建Maven项目pom.xml引入关键jar包,下文中有些jar包用不到可删减
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>4.1.4.RELEASE</spring.version> <jackson.version>2.5.0</jackson.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> <!-- spring核心依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-war-plugin</artifactId> <configuration> <version>3.0</version> </configuration> </plugin> </plugins> </build> </project>
3.RabbitMQ连接配置config->RabbitMQ.properties
rmq.host=127.0.0.1
rmq.port=5672
rmq.user=guest
rmq.password=guest
rmq.channelCacheSize=25
rmq.virtual=/
4.Spring RabbitMQ整合配置xml文件
此文件中涉及到的一些对象,在下文中有定义
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <description>Spring RabbitMQ 路由模式(Routing)配置</description> <!--引入配置属性文件 --> <context:property-placeholder location="classpath:config/*.properties" /> <!-- 配置RabbitMQ连接 --> <!-- channel-cache-size,channel的缓存数量,默认值为25 --> <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) --> <rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}" username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定义消息队列 durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不失去queue和消息,需要同时标志队列(queue) 和交换机(exchange)持久化,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message) 默认是持久化的,可以看类org.springframework.amqp.core.MessageProperties中的属性 public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 exclusive: 仅创建者可以使用的私有队列,断开后自动删除; --> <rabbit:queue id="queue" name="SpringRabbit-Routing-Queue1" durable="true" auto-delete="false" exclusive="false"/> <!-- 绑定队列 rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,此处为direct模式,即rabbit:direct-exchange标签, Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等, 则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。 但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心, auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false --> <rabbit:direct-exchange name="SpringRabbit-Direct-Exchange" auto-declare="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue" key="info"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring template声明 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Direct-Exchange"/> <!-- 生产者 --> <bean id="producter" class="com.rabbitmq.producter.Producter"> <constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/> <constructor-arg name="routekey" value="info"/> </bean> <!-- 消费者 --> <bean id="consumer" class="com.rabbitmq.consumer.Consumer"/> <!-- 队列监听--> <!-- acknowledge:auto 自动确认(默认), manual手动确认 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queue" ref="consumer"/> </rabbit:listener-container> </beans>
**5.**生产者
package com.rabbitmq.producter; import org.springframework.amqp.rabbit.core.RabbitTemplate; //生产者 public class Producter { private RabbitTemplate rabbitTemplate; private String routekey; public Producter(RabbitTemplate rabbitTemplate, String routekey) { this.rabbitTemplate = rabbitTemplate; this.routekey = routekey; } public void sendMessage(String message) { rabbitTemplate.convertAndSend(routekey, message); } }
6.消费者
package com.rabbitmq.consumer; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * 消费者 * 实现 MessageListener或ChannelAwareMessageListener(需手动确认的实现此接口) */ public class Consumer implements MessageListener{ public void onMessage(Message message) { String msg = null; try { msg = new String(message.getBody(),"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println("消费者消费:"+msg); } }
7.路由模式测试
package com.rabbitmq.controller; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.rabbitmq.producter.Producter; //路由模式测试 public class RoutingTest { public static void main(String[] args) { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("spring-rabbitmq-routing.xml"); Producter routingProducter = ctx.getBean(Producter.class); String message = "spring rabit routing hello!"; routingProducter.sendMessage(message); ctx.close(); } }
8.其他模式
其他模式就不一一实现,这里简单的把发布订阅模式和通配符模式和Spring整合配置文件介绍下。发布定义模式没有去掉路由键,通配符模式则是讲路由缓存类似正则表达式去匹配即可
8.1发布订阅模式配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <description>Spring RabbitMQ 发布订阅模式(publish_subscrible)配置</description> <!--引入配置属性文件 --> <context:property-placeholder location="classpath:config/*.properties" /> <!-- 配置RabbitMQ连接 --> <!-- channel-cache-size,channel的缓存数量,默认值为25 --> <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) --> <rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}" username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定义消息队列 durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不失去queue和消息,需要同时标志队列(queue) 和交换机(exchange)持久化,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message) 默认是持久化的,可以看类org.springframework.amqp.core.MessageProperties中的属性 public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 exclusive: 仅创建者可以使用的私有队列,断开后自动删除; --> <rabbit:queue id="queue" name="SpringRabbit-PublishScrible-Queue" durable="true" auto-delete="false" exclusive="false"/> <!-- 绑定队列 rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,此处为direct模式,即rabbit:direct-exchange标签, Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等, 则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。 但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心, auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false --> <rabbit:fanout-exchange name="SpringRabbit-Fanout-Exchange" auto-declare="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- spring template声明 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Fanout-Exchange"/> <!-- 生产者 --> <bean id="producter" class="com.rabbitmq.producter.Producter"> <constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/> <constructor-arg name="routekey" value=""/> </bean> <!-- 消费者 --> <bean id="consumer" class="com.rabbitmq.consumer.Consumer"/> <!-- 队列监听--> <!-- acknowledge:auto 自动确认(默认), manual手动确认 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queue" ref="consumer"/> </rabbit:listener-container> </beans>
8.2通配符模式配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <description>Spring RabbitMQ 通配符模式(Topics)配置</description> <!--引入配置属性文件 --> <context:property-placeholder location="classpath:config/*.properties" /> <!-- 配置RabbitMQ连接 --> <!-- channel-cache-size,channel的缓存数量,默认值为25 --> <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) --> <rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}" username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定义消息队列 durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不失去queue和消息,需要同时标志队列(queue) 和交换机(exchange)持久化,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message) 默认是持久化的,可以看类org.springframework.amqp.core.MessageProperties中的属性 public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 exclusive: 仅创建者可以使用的私有队列,断开后自动删除; --> <rabbit:queue id="queue" name="SpringRabbit-Topics-Queue" durable="true" auto-delete="false" exclusive="false"/> <!-- 绑定队列 rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,此处为direct模式,即rabbit:direct-exchange标签, Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等, 则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。 但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心, auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false --> <rabbit:topic-exchange name="SpringRabbit-Topic-Exchange" auto-declare="true" auto-delete="false"> <rabbit:bindings> <!-- 符号“#”匹配一个或多个词,符号“*”仅匹配一个词 如:因此“china.#”能够匹配到“china.news.info”,但是“china.*”只会匹配到“china.news” --> <rabbit:binding queue="queue" pattern="china.#"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- spring template声明 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Topic-Exchange"/> <!-- 生产者 --> <bean id="producter" class="com.rabbitmq.producter.Producter"> <constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/> <constructor-arg name="routekey" value="china.news"/> </bean> <!-- 消费者 --> <bean id="consumer" class="com.rabbitmq.consumer.Consumer"/> <!-- 队列监听--> <!-- acknowledge:auto 自动确认(默认), manual手动确认 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="queue" ref="consumer"/> </rabbit:listener-container> </beans>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。