当前位置:   article > 正文

RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案二_rabbittemplate 发送消息持久化

rabbittemplate 发送消息持久化

   RabbitMQ的大约的介绍,上一篇已经有介绍了,这篇不介绍,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。

 使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包

   

  1. <span style="font-size:18px;"> <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit</artifactId>
  4. <version>1.3.6.RELEASE</version>
  5. </dependency></span>
  1.实现生产者

     第一步:是要设置调用安装RabbitMQ的IP、端口等

                配置一个global.properties文件

           


   第二步:通过SpringMVCglobal.properties文件读进来

  1. <span style="font-size:18px;"><!-- 注入属性文件 -->
  2. <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  3. <property name="locations">
  4. <list>
  5. <value>classpath:global.properties</value>
  6. </list>
  7. </property>
  8. </bean> </span>
  

第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些

<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>

  1. <span style="font-size:18px;"> <!-- 创建连接类 -->
  2. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  3. <constructor-arg value="localhost" />
  4. <property name="username" value="${rmq.manager.user}" />
  5. <property name="password" value="${rmq.manager.password}" />
  6. <property name="host" value="${rmq.ip}" />
  7. <property name="port" value="${rmq.port}" />
  8. </bean>
  9. <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  10. <constructor-arg ref="connectionFactory" />
  11. </bean>
  12. <!-- 创建rabbitTemplate 消息模板类 -->
  13. <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  14. <constructor-arg ref="connectionFactory"></constructor-arg>
  15. </bean> </span>
第四步:实现消息类实体和发送消息

   类实体

  1. <span style="font-size:18px;">/**
  2. * 消息
  3. *
  4. */
  5. public class RabbitMessage implements Serializable
  6. {
  7. private static final long serialVersionUID = -6487839157908352120L;
  8. private Class<?>[] paramTypes;//参数类型
  9. private String exchange;//交换器
  10. private Object[] params;
  11. private String routeKey;//路由key
  12. public RabbitMessage(){}
  13. public RabbitMessage(String exchange,String routeKey,Object...params)
  14. {
  15. this.params=params;
  16. this.exchange=exchange;
  17. this.routeKey=routeKey;
  18. }
  19. @SuppressWarnings("rawtypes")
  20. public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
  21. {
  22. this.params=params;
  23. this.exchange=exchange;
  24. this.routeKey=routeKey;
  25. int len=params.length;
  26. Class[] clazzArray=new Class[len];
  27. for(int i=0;i<len;i++)
  28. clazzArray[i]=params[i].getClass();
  29. this.paramTypes=clazzArray;
  30. }
  31. public byte[] getSerialBytes()
  32. {
  33. byte[] res=new byte[0];
  34. ByteArrayOutputStream baos=new ByteArrayOutputStream();
  35. ObjectOutputStream oos;
  36. try {
  37. oos = new ObjectOutputStream(baos);
  38. oos.writeObject(this);
  39. oos.close();
  40. res=baos.toByteArray();
  41. } catch (IOException e) {
  42. e.printStackTrace();
  43. }
  44. return res;
  45. }
  46. public String getRouteKey() {
  47. return routeKey;
  48. }
  49. public String getExchange() {
  50. return exchange;
  51. }
  52. public void setExchange(String exchange) {
  53. this.exchange = exchange;
  54. }
  55. public void setRouteKey(String routeKey) {
  56. this.routeKey = routeKey;
  57. }
  58. public Class<?>[] getParamTypes() {
  59. return paramTypes;
  60. }
  61. public Object[] getParams() {
  62. return params;
  63. }
  64. }
  65. </span>
   发送消息

   

  1. <span style="font-size:18px;">/**
  2. * 生产着
  3. *
  4. */
  5. public class RmqProducer
  6. {
  7. @Resource
  8. private RabbitTemplate rabbitTemplate;
  9. /**
  10. * 发送信息
  11. * @param msg
  12. */
  13. public void sendMessage(RabbitMessage msg)
  14. {
  15. try {
  16. System.out.println(rabbitTemplate.getConnectionFactory().getHost());
  17. System.out.println(rabbitTemplate.getConnectionFactory().getPort());
  18. //发送信息
  19. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
  20. } catch (Exception e) {
  21. }
  22. }
  23. }</span>
 说明:

   1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

      源代码中的send调用的方法,一些发送消息帮我们实现好了。

         
    2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。

          
     我们也可以用代码申明:

            rabbitAdmin要申明:eclareExchange方法  参数是交换器                                                       

                                BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange 

                                rabbitAdmin.declareBinding(binding);//声明绑定关系

                  源代码有这些方法:

                     

                         

          这样就可以实现交换器和队列的绑定关系

           交换器我们可以申明为持久化,还有使用完不会自动删除

             TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化  autoDelete:false使用完不删除

               源代码:

                   


              

           队列也可以申明为持久化

            

            


  第五步:实现测试类

      

  1. <span style="font-size:18px;">@Resource
  2. private RmqProducer rmqProducer2;
  3. @Test
  4. public void test() throws IOException
  5. {
  6. String exchange="testExchange";交换器
  7. String routeKey="testQueue";//队列
  8. String methodName="test";//调用的方法
  9. //参数
  10. Map<String,Object> param=new HashMap<String, Object>();
  11. param.put("data","hello");
  12. RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
  13. //发送消息
  14. rmqProducer2.sendMessage(msg);
  15. }</span>
   结果:RabbitMQ有一条消息

          

    

  2.消费者

    第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些

   

  1. <span style="font-size:18px;"> <!-- 创建连接类 -->
  2. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  3. <constructor-arg value="localhost" />
  4. <property name="username" value="${rmq.manager.user}" />
  5. <property name="password" value="${rmq.manager.password}" />
  6. <property name="host" value="${rmq.ip}" />
  7. <property name="port" value="${rmq.port}" />
  8. </bean>
  9. <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  10. <constructor-arg ref="connectionFactory" />
  11. </bean>
  12. <!-- 创建rabbitTemplate 消息模板类 -->
  13. <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  14. <constructor-arg ref="connectionFactory"></constructor-arg>
  15. </bean>
  16. <!-- 创建消息转换器为SimpleMessageConverter -->
  17. <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
  18. <!-- 设置持久化的队列 -->
  19. <bean id="queue" class="org.springframework.amqp.core.Queue">
  20. <constructor-arg index="0" value="testQueue"></constructor-arg>
  21. <constructor-arg index="1" value="true"></constructor-arg>
  22. <constructor-arg index="2" value="false"></constructor-arg>
  23. <constructor-arg index="3" value="false"></constructor-arg>
  24. </bean>
  25. <!--创建交换器的类型 并持久化-->
  26. <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
  27. <constructor-arg index="0" value="testExchange"></constructor-arg>
  28. <constructor-arg index="1" value="true"></constructor-arg>
  29. <constructor-arg index="2" value="false"></constructor-arg>
  30. </bean>
  31. <util:map id="arguments">
  32. </util:map>
  33. <!-- 绑定交换器、队列 -->
  34. <bean id="binding" class="org.springframework.amqp.core.Binding">
  35. <constructor-arg index="0" value="testQueue"></constructor-arg>
  36. <constructor-arg index="1" value="QUEUE"></constructor-arg>
  37. <constructor-arg index="2" value="testExchange"></constructor-arg>
  38. <constructor-arg index="3" value="testQueue"></constructor-arg>
  39. <constructor-arg index="4" value="#{arguments}"></constructor-arg>
  40. </bean>
  41. <!-- 用于接收消息的处理类 -->
  42. <bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean>
  43. <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
  44. <constructor-arg ref="rmqConsumer" />
  45. <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
  46. <property name="messageConverter" ref="serializerMessageConverter"></property>
  47. </bean>
  48. <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
  49. <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
  50. <property name="queues" ref="queue"></property>
  51. <property name="connectionFactory" ref="connectionFactory"></property>
  52. <property name="messageListener" ref="messageListenerAdapter"></property>
  53. </bean>
  54. </span>

说明:

   1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列

     

   2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter

     有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。

   3.交换器和队列的持久化在生产者有介绍过了。

   4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,

    DestinationType这个参数要注意点

    源代码:

      

  第二步:处理消息

    

  1. <span style="font-size:18px;">/**
  2. * 消费者
  3. *
  4. */
  5. public class RmqConsumer
  6. {
  7. public void rmqProducerMessage(Object object){
  8. RabbitMessage rabbitMessage=(RabbitMessage) object;
  9. System.out.println(rabbitMessage.getExchange());
  10. System.out.println(rabbitMessage.getRouteKey());
  11. System.out.println(rabbitMessage.getParams().toString());
  12. }
  13. }</span>


   在启动过程中会报这样的错误,可能是你的交换器和队列没配置好

    

        




   




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/139246
推荐阅读
相关标签
  

闽ICP备14008679号