赞
踩
RabbitMQ的大约的介绍,上一篇已经有介绍了,这篇不介绍,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。
使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包
- <span style="font-size:18px;"> <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.3.6.RELEASE</version>
- </dependency></span>
1.实现生产者
第一步:是要设置调用安装RabbitMQ的IP、端口等
配置一个global.properties文件
第二步:通过SpringMVC把global.properties文件读进来
- <span style="font-size:18px;"><!-- 注入属性文件 -->
- <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="locations">
- <list>
- <value>classpath:global.properties</value>
- </list>
- </property>
- </bean> </span>
第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些
<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>
- <span style="font-size:18px;"> <!-- 创建连接类 -->
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="localhost" />
- <property name="username" value="${rmq.manager.user}" />
- <property name="password" value="${rmq.manager.password}" />
- <property name="host" value="${rmq.ip}" />
- <property name="port" value="${rmq.port}" />
- </bean>
-
- <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg ref="connectionFactory" />
- </bean>
- <!-- 创建rabbitTemplate 消息模板类 -->
- <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg ref="connectionFactory"></constructor-arg>
- </bean> </span>
第四步:实现消息类实体和发送消息
类实体
- <span style="font-size:18px;">/**
- * 消息
- *
- */
- public class RabbitMessage implements Serializable
- {
- private static final long serialVersionUID = -6487839157908352120L;
-
- private Class<?>[] paramTypes;//参数类型
- private String exchange;//交换器
-
- private Object[] params;
-
- private String routeKey;//路由key
-
- public RabbitMessage(){}
-
- public RabbitMessage(String exchange,String routeKey,Object...params)
- {
- this.params=params;
- this.exchange=exchange;
- this.routeKey=routeKey;
- }
-
- @SuppressWarnings("rawtypes")
- public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
- {
- this.params=params;
- this.exchange=exchange;
- this.routeKey=routeKey;
- int len=params.length;
- Class[] clazzArray=new Class[len];
- for(int i=0;i<len;i++)
- clazzArray[i]=params[i].getClass();
- this.paramTypes=clazzArray;
- }
-
- public byte[] getSerialBytes()
- {
- byte[] res=new byte[0];
- ByteArrayOutputStream baos=new ByteArrayOutputStream();
- ObjectOutputStream oos;
- try {
- oos = new ObjectOutputStream(baos);
- oos.writeObject(this);
- oos.close();
- res=baos.toByteArray();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return res;
- }
-
-
-
-
- public String getRouteKey() {
- return routeKey;
- }
-
-
-
- public String getExchange() {
- return exchange;
- }
-
- public void setExchange(String exchange) {
- this.exchange = exchange;
- }
-
- public void setRouteKey(String routeKey) {
- this.routeKey = routeKey;
- }
-
-
-
- public Class<?>[] getParamTypes() {
- return paramTypes;
- }
-
- public Object[] getParams() {
- return params;
- }
-
-
-
-
- }
- </span>
发送消息
- <span style="font-size:18px;">/**
- * 生产着
- *
- */
-
- public class RmqProducer
- {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 发送信息
- * @param msg
- */
- public void sendMessage(RabbitMessage msg)
- {
- try {
- System.out.println(rabbitTemplate.getConnectionFactory().getHost());
- System.out.println(rabbitTemplate.getConnectionFactory().getPort());
- //发送信息
- rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
-
- } catch (Exception e) {
- }
-
-
- }
-
- }</span>
说明:
1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
源代码中的send调用的方法,一些发送消息帮我们实现好了。
2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。
我们也可以用代码申明:
rabbitAdmin要申明:eclareExchange方法 参数是交换器
BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
rabbitAdmin.declareBinding(binding);//声明绑定关系
源代码有这些方法:
这样就可以实现交换器和队列的绑定关系
交换器我们可以申明为持久化,还有使用完不会自动删除
TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除
源代码:
队列也可以申明为持久化
第五步:实现测试类
- <span style="font-size:18px;">@Resource
- private RmqProducer rmqProducer2;
-
- @Test
- public void test() throws IOException
- {
-
-
- String exchange="testExchange";交换器
- String routeKey="testQueue";//队列
- String methodName="test";//调用的方法
- //参数
- Map<String,Object> param=new HashMap<String, Object>();
- param.put("data","hello");
-
- RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
- //发送消息
- rmqProducer2.sendMessage(msg);
-
- }</span>
结果:RabbitMQ有一条消息
2.消费者
第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些
- <span style="font-size:18px;"> <!-- 创建连接类 -->
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="localhost" />
- <property name="username" value="${rmq.manager.user}" />
- <property name="password" value="${rmq.manager.password}" />
- <property name="host" value="${rmq.ip}" />
- <property name="port" value="${rmq.port}" />
- </bean>
-
- <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg ref="connectionFactory" />
- </bean>
- <!-- 创建rabbitTemplate 消息模板类 -->
- <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg ref="connectionFactory"></constructor-arg>
- </bean>
-
-
- <!-- 创建消息转换器为SimpleMessageConverter -->
- <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
-
-
- <!-- 设置持久化的队列 -->
- <bean id="queue" class="org.springframework.amqp.core.Queue">
- <constructor-arg index="0" value="testQueue"></constructor-arg>
- <constructor-arg index="1" value="true"></constructor-arg>
- <constructor-arg index="2" value="false"></constructor-arg>
- <constructor-arg index="3" value="false"></constructor-arg>
- </bean>
-
-
- <!--创建交换器的类型 并持久化-->
- <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
- <constructor-arg index="0" value="testExchange"></constructor-arg>
- <constructor-arg index="1" value="true"></constructor-arg>
- <constructor-arg index="2" value="false"></constructor-arg>
- </bean>
-
- <util:map id="arguments">
- </util:map>
-
-
- <!-- 绑定交换器、队列 -->
- <bean id="binding" class="org.springframework.amqp.core.Binding">
- <constructor-arg index="0" value="testQueue"></constructor-arg>
- <constructor-arg index="1" value="QUEUE"></constructor-arg>
- <constructor-arg index="2" value="testExchange"></constructor-arg>
- <constructor-arg index="3" value="testQueue"></constructor-arg>
- <constructor-arg index="4" value="#{arguments}"></constructor-arg>
- </bean>
-
-
- <!-- 用于接收消息的处理类 -->
- <bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean>
-
- <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="rmqConsumer" />
- <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
- <property name="messageConverter" ref="serializerMessageConverter"></property>
- </bean>
-
- <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
- <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="queues" ref="queue"></property>
- <property name="connectionFactory" ref="connectionFactory"></property>
- <property name="messageListener" ref="messageListenerAdapter"></property>
- </bean>
- </span>
说明:
1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列
2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。
3.交换器和队列的持久化在生产者有介绍过了。
4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,
DestinationType这个参数要注意点
源代码:
第二步:处理消息
- <span style="font-size:18px;">/**
- * 消费者
- *
- */
- public class RmqConsumer
- {
- public void rmqProducerMessage(Object object){
-
- RabbitMessage rabbitMessage=(RabbitMessage) object;
-
- System.out.println(rabbitMessage.getExchange());
- System.out.println(rabbitMessage.getRouteKey());
- System.out.println(rabbitMessage.getParams().toString());
-
-
- }
-
-
-
-
-
-
- }</span>
在启动过程中会报这样的错误,可能是你的交换器和队列没配置好
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。