一、linux下安装rabbitmq
1、安装erlang环境
- wget http://erlang.org/download/otp_src_18.2.1.tar.gz
- tar xvfz otp_src_18.2.1.tar.gz
- cd otp_src_18.2.1
- ./configure
- make install
2、安装RabbitMQ
- wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz
- //xy文件压缩工具
- yum install xz
- //解压
- xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz
- tar -xvf rabbitmq-server-generic-unix-x.x.x.tar
- //将其移动至/usr/local/下 按自己习惯
- cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq
- //改变环境变量
- vi /etc/profile
- export PATH=/usr/local/rabbitmq/sbin:$PATH
- source /etc/profile
- //启用MQ管理方式
- rabbitmq-plugins enable rabbitmq_management #启动后台管理
- rabbitmq-server -detached #后台运行rabbitmq
- //设置端口号 可供外部使用
- iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
3、添加用户和权限
- //添加用户
- rabbitmqctl add_user admin admin
- //添加权限
- rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- //添加用户角色
- rabbitmqctl set_user_tags admin administrator
二、Spring mvc整合RabbitMQ
1、添加pom.xml依赖jar包
- <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.7.5.RELEASE</version>
- </dependency>
2、添加配置applicationContext.xml
- <!--配置rabbitmq开始-->
- <bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="192.168.181.201"/>
- <property name="username" value="admin"/>
- <property name="password" value="admin"/>
- <property name="host" value="192.168.181.201"/>
- <property name="port" value="5672"/>
- </bean>
- <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg ref="connectionFactoryMq"/>
- </bean>
- <!--创建rabbitTemplate消息模板类-->
- <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg ref="connectionFactoryMq"/>
- </bean>
- <!--创建消息转换器为SimpleMessageConverter-->
- <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
- </bean>
- <!--创建持久化的队列-->
- <bean id="queue" class="org.springframework.amqp.core.Queue">
- <constructor-arg index="0" value="testQueue"></constructor-arg>
- <constructor-arg index="1" value="true"></constructor-arg>
- <constructor-arg index="2" value="false"></constructor-arg>
- <constructor-arg index="3" value="true"></constructor-arg>
- </bean>
- <!--创建交换器的类型 并持久化-->
- <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
- <constructor-arg index="0" value="testExchange"></constructor-arg>
- <constructor-arg index="1" value="true"></constructor-arg>
- <constructor-arg index="2" value="false"></constructor-arg>
- </bean>
- <util:map id="arguments">
-
- </util:map>
- <!--绑定交换器 队列-->
- <bean id="binding" class="org.springframework.amqp.core.Binding">
- <constructor-arg index="0" value="testQueue"></constructor-arg>
- <constructor-arg index="1" value="QUEUE"></constructor-arg>
- <constructor-arg index="2" value="testExchange"></constructor-arg>
- <constructor-arg index="3" value="testQueue"></constructor-arg>
- <constructor-arg index="4" value="#{arguments}"></constructor-arg>
- </bean>
- <!--用于接收消息的处理类-->
- <bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean>
-
- <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="rqmConsumer" />
- <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
- <property name="messageConverter" ref="serializerMessageConverter"></property>
- </bean>
- <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
- <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="queues" ref="queue"></property>
- <property name="connectionFactory" ref="connectionFactoryMq"></property>
- <property name="messageListener" ref="messageListenerAdapter"></property>
- </bean>
- <bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean>
- <!--配置rabbitmq结束-->
3、消息实体类
- package com.slp.mq;
-
- import java.io.*;
-
- /**
- * @author sanglp
- * @create 2018-02-06 14:00
- * @desc rabbit消息类
- **/
- public class RabbitMessage implements Serializable {
- /**
- * 参数类型
- */
- private Class<?>[] paramTypes ;
- /**
- * 交换器
- */
- private String exchange;
-
- private Object[] params;
- /**
- * 路由key
- */
- private String routekey;
-
- public RabbitMessage() {
- }
-
- public RabbitMessage(String exchange, String routekey,Object...params) {
- this.exchange = exchange;
- this.params = params;
- this.routekey = routekey;
- }
-
- @SuppressWarnings("rawtypes")
- public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
- {
- this.params=params;
- this.exchange=exchange;
- this.routekey=routeKey;
- int len=params.length;
- Class[] clazzArray=new Class[len];
- for(int i=0;i<len;i++) {
- clazzArray[i] = params[i].getClass();
- }
- this.paramTypes=clazzArray;
- }
-
- public byte[] getSerialBytes(){
- byte[] res = new byte[0];
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutput oos ;
- try {
- oos = new ObjectOutputStream(baos);
- oos.writeObject(this);
- oos.close();
- res = baos.toByteArray();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return res;
- }
-
- public Class<?>[] getParamTypes() {
- return paramTypes;
- }
-
- public void setParamTypes(Class<?>[] paramTypes) {
- this.paramTypes = paramTypes;
- }
-
- public String getExchange() {
- return exchange;
- }
-
- public void setExchange(String exchange) {
- this.exchange = exchange;
- }
-
- public Object[] getParams() {
- return params;
- }
-
- public void setParams(Object[] params) {
- this.params = params;
- }
-
- public String getRoutekey() {
- return routekey;
- }
-
- public void setRoutekey(String routekey) {
- this.routekey = routekey;
- }
- }
4、生产者
- package com.slp.mq;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
- import javax.annotation.Resource;
-
- /**
- * @author sanglp
- * @create 2018-02-06 14:19
- * @desc 生产者
- **/
- public class RmqProducer {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 发送信息
- * @param msg
- */
- public void sendMessage(RabbitMessage msg){
- System.out.println(rabbitTemplate.getConnectionFactory().getHost());
- System.out.println(rabbitTemplate.getConnectionFactory().getPort());
- System.out.println("msg"+msg);
- rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg);
- System.out.println("发送完成");
-
- }
- }
5、消费者
- package com.slp.mq;
-
- /**
- * @author sanglp
- * @create 2018-02-06 14:23
- * @desc 消费者
- **/
- public class RmqConsumer {
-
- public void rmqProducerMessage(Object object){
- System.out.println("消费前");
- RabbitMessage rabbitMessage = (RabbitMessage) object;
- System.out.println(rabbitMessage.getExchange());
- System.out.println(rabbitMessage.getRoutekey());
- System.out.println(rabbitMessage.getParams().toString());
- }
- }
6、测试类
- package com.slp;
-
- import com.slp.mq.RabbitMessage;
- import com.slp.mq.RmqConsumer;
- import com.slp.mq.RmqProducer;
- import org.junit.Before;
- import org.junit.Test;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.FileSystemXmlApplicationContext;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author sanglp
- * @create 2018-02-06 14:36
- * @desc mq测试类
- **/
- public class MqTest {
-
-
- private RmqProducer rmqProducer ;
- private RmqConsumer rqmConsumer ;
- @Before
- public void setUp() throws Exception {
- //ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml");
- //context.start();
-
- String path="web/WEB-INF/applicationContext.xml";
- ApplicationContext context = new FileSystemXmlApplicationContext(path);
- rmqProducer = (RmqProducer) context.getBean("rmqProducer");
- rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer");
- }
- @Test
- public void test(){
- String exchange = "testExchange";
- String routeKey ="testQueue";
- String methodName = "test";
- //参数
- for (int i=0;i<10;i++){
- Map<String,Object> param=new HashMap<String, Object>();
- param.put("data","hello");
-
- RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
- //发送消息
- rmqProducer.sendMessage(msg);
- }
-
- // rqmConsumer.rmqProducerMessage(msg);
-
- }
- }
运行结果:
没有开启消费者之前: