当前位置:   article > 正文

【RabbitMQ系列】 Spring mvc整合RabbitMQ

springmvc rmq

一、linux下安装rabbitmq

1、安装erlang环境

  1. wget http://erlang.org/download/otp_src_18.2.1.tar.gz
  2. tar xvfz otp_src_18.2.1.tar.gz
  3. cd otp_src_18.2.1
  4. ./configure
  5. make install

2、安装RabbitMQ

  1. wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz
  2. //xy文件压缩工具
  3. yum install xz
  4. //解压
  5. xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz
  6. tar -xvf rabbitmq-server-generic-unix-x.x.x.tar
  7. //将其移动至/usr/local/下 按自己习惯
  8. cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq
  9. //改变环境变量
  10. vi /etc/profile
  11. export PATH=/usr/local/rabbitmq/sbin:$PATH
  12. source /etc/profile
  13. //启用MQ管理方式
  14. rabbitmq-plugins enable rabbitmq_management #启动后台管理
  15. rabbitmq-server -detached #后台运行rabbitmq
  16. //设置端口号 可供外部使用
  17. iptables -I INPUT -p tcp --dport 15672 -j ACCEPT

3、添加用户和权限

  1. //添加用户
  2. rabbitmqctl add_user admin admin
  3. //添加权限
  4. rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  5. //添加用户角色
  6. rabbitmqctl set_user_tags admin administrator  

 

二、Spring mvc整合RabbitMQ

1、添加pom.xml依赖jar包

  1. <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
  2. <dependency>
  3. <groupId>org.springframework.amqp</groupId>
  4. <artifactId>spring-rabbit</artifactId>
  5. <version>1.7.5.RELEASE</version>
  6. </dependency>

2、添加配置applicationContext.xml

  1. <!--配置rabbitmq开始-->
  2. <bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  3. <constructor-arg value="192.168.181.201"/>
  4. <property name="username" value="admin"/>
  5. <property name="password" value="admin"/>
  6. <property name="host" value="192.168.181.201"/>
  7. <property name="port" value="5672"/>
  8. </bean>
  9. <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  10. <constructor-arg ref="connectionFactoryMq"/>
  11. </bean>
  12. <!--创建rabbitTemplate消息模板类-->
  13. <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  14. <constructor-arg ref="connectionFactoryMq"/>
  15. </bean>
  16. <!--创建消息转换器为SimpleMessageConverter-->
  17. <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
  18. </bean>
  19. <!--创建持久化的队列-->
  20. <bean id="queue" class="org.springframework.amqp.core.Queue">
  21. <constructor-arg index="0" value="testQueue"></constructor-arg>
  22. <constructor-arg index="1" value="true"></constructor-arg>
  23. <constructor-arg index="2" value="false"></constructor-arg>
  24. <constructor-arg index="3" value="true"></constructor-arg>
  25. </bean>
  26. <!--创建交换器的类型 并持久化-->
  27. <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
  28. <constructor-arg index="0" value="testExchange"></constructor-arg>
  29. <constructor-arg index="1" value="true"></constructor-arg>
  30. <constructor-arg index="2" value="false"></constructor-arg>
  31. </bean>
  32. <util:map id="arguments">
  33. </util:map>
  34. <!--绑定交换器 队列-->
  35. <bean id="binding" class="org.springframework.amqp.core.Binding">
  36. <constructor-arg index="0" value="testQueue"></constructor-arg>
  37. <constructor-arg index="1" value="QUEUE"></constructor-arg>
  38. <constructor-arg index="2" value="testExchange"></constructor-arg>
  39. <constructor-arg index="3" value="testQueue"></constructor-arg>
  40. <constructor-arg index="4" value="#{arguments}"></constructor-arg>
  41. </bean>
  42. <!--用于接收消息的处理类-->
  43. <bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean>
  44. <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
  45. <constructor-arg ref="rqmConsumer" />
  46. <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
  47. <property name="messageConverter" ref="serializerMessageConverter"></property>
  48. </bean>
  49. <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
  50. <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
  51. <property name="queues" ref="queue"></property>
  52. <property name="connectionFactory" ref="connectionFactoryMq"></property>
  53. <property name="messageListener" ref="messageListenerAdapter"></property>
  54. </bean>
  55. <bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean>
  56. <!--配置rabbitmq结束-->

3、消息实体类

  1. package com.slp.mq;
  2. import java.io.*;
  3. /**
  4. * @author sanglp
  5. * @create 2018-02-06 14:00
  6. * @desc rabbit消息类
  7. **/
  8. public class RabbitMessage implements Serializable {
  9. /**
  10. * 参数类型
  11. */
  12. private Class<?>[] paramTypes ;
  13. /**
  14. * 交换器
  15. */
  16. private String exchange;
  17. private Object[] params;
  18. /**
  19. * 路由key
  20. */
  21. private String routekey;
  22. public RabbitMessage() {
  23. }
  24. public RabbitMessage(String exchange, String routekey,Object...params) {
  25. this.exchange = exchange;
  26. this.params = params;
  27. this.routekey = routekey;
  28. }
  29. @SuppressWarnings("rawtypes")
  30. public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
  31. {
  32. this.params=params;
  33. this.exchange=exchange;
  34. this.routekey=routeKey;
  35. int len=params.length;
  36. Class[] clazzArray=new Class[len];
  37. for(int i=0;i<len;i++) {
  38. clazzArray[i] = params[i].getClass();
  39. }
  40. this.paramTypes=clazzArray;
  41. }
  42. public byte[] getSerialBytes(){
  43. byte[] res = new byte[0];
  44. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  45. ObjectOutput oos ;
  46. try {
  47. oos = new ObjectOutputStream(baos);
  48. oos.writeObject(this);
  49. oos.close();
  50. res = baos.toByteArray();
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. return res;
  55. }
  56. public Class<?>[] getParamTypes() {
  57. return paramTypes;
  58. }
  59. public void setParamTypes(Class<?>[] paramTypes) {
  60. this.paramTypes = paramTypes;
  61. }
  62. public String getExchange() {
  63. return exchange;
  64. }
  65. public void setExchange(String exchange) {
  66. this.exchange = exchange;
  67. }
  68. public Object[] getParams() {
  69. return params;
  70. }
  71. public void setParams(Object[] params) {
  72. this.params = params;
  73. }
  74. public String getRoutekey() {
  75. return routekey;
  76. }
  77. public void setRoutekey(String routekey) {
  78. this.routekey = routekey;
  79. }
  80. }

 

4、生产者

  1. package com.slp.mq;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import javax.annotation.Resource;
  4. /**
  5. * @author sanglp
  6. * @create 2018-02-06 14:19
  7. * @desc 生产者
  8. **/
  9. public class RmqProducer {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. * 发送信息
  14. * @param msg
  15. */
  16. public void sendMessage(RabbitMessage msg){
  17. System.out.println(rabbitTemplate.getConnectionFactory().getHost());
  18. System.out.println(rabbitTemplate.getConnectionFactory().getPort());
  19. System.out.println("msg"+msg);
  20. rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg);
  21. System.out.println("发送完成");
  22. }
  23. }

  

5、消费者

  1. package com.slp.mq;
  2. /**
  3. * @author sanglp
  4. * @create 2018-02-06 14:23
  5. * @desc 消费者
  6. **/
  7. public class RmqConsumer {
  8. public void rmqProducerMessage(Object object){
  9. System.out.println("消费前");
  10. RabbitMessage rabbitMessage = (RabbitMessage) object;
  11. System.out.println(rabbitMessage.getExchange());
  12. System.out.println(rabbitMessage.getRoutekey());
  13. System.out.println(rabbitMessage.getParams().toString());
  14. }
  15. }

  

6、测试类

  1. package com.slp;
  2. import com.slp.mq.RabbitMessage;
  3. import com.slp.mq.RmqConsumer;
  4. import com.slp.mq.RmqProducer;
  5. import org.junit.Before;
  6. import org.junit.Test;
  7. import org.springframework.context.ApplicationContext;
  8. import org.springframework.context.support.FileSystemXmlApplicationContext;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @author sanglp
  13. * @create 2018-02-06 14:36
  14. * @desc mq测试类
  15. **/
  16. public class MqTest {
  17. private RmqProducer rmqProducer ;
  18. private RmqConsumer rqmConsumer ;
  19. @Before
  20. public void setUp() throws Exception {
  21. //ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml");
  22. //context.start();
  23. String path="web/WEB-INF/applicationContext.xml";
  24. ApplicationContext context = new FileSystemXmlApplicationContext(path);
  25. rmqProducer = (RmqProducer) context.getBean("rmqProducer");
  26. rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer");
  27. }
  28. @Test
  29. public void test(){
  30. String exchange = "testExchange";
  31. String routeKey ="testQueue";
  32. String methodName = "test";
  33. //参数
  34. for (int i=0;i<10;i++){
  35. Map<String,Object> param=new HashMap<String, Object>();
  36. param.put("data","hello");
  37. RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
  38. //发送消息
  39. rmqProducer.sendMessage(msg);
  40. }
  41. // rqmConsumer.rmqProducerMessage(msg);
  42. }
  43. }

 

运行结果:

没有开启消费者之前:

 

  

  

  

转载于:https://www.cnblogs.com/dream-to-pku/p/8423350.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/139124
推荐阅读
相关标签
  

闽ICP备14008679号