赞
踩
目录
根据项目的需要,需要搭建一个消息中间件RabbitMQ服务器,并使用RabbitMQ监听器去同步其他系统的数据,在网上搜索浏览了大量的博客文档学习,吸取自己认为较好的文章的部分,并自己动手操作了一番记录了下来,希望能帮助想学习的小伙伴们。
再此特别感谢那些总结博客的大佬们提供的文档资料。
RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受 客户端/中间件 不同产品,不同的开发语言等条件的限制。
(1)由Erlang语言开发,支持大量协议:AMQP、XMPP、SMTP、STOMP。
(2)支持消息的持久化、负载均衡和集群,且集群易扩展。
(3)具有一个Web监控界面,易于管理。
(4)安装部署简单,上手容易,功能丰富,强大的社区支持。
(5)支持消息确认机制、灵活的消息分发机制。
(1)由于牺牲了部分性能来换取稳定性,比如消息的持久化功能,使得RabbitMQ在大吞吐量性能方面不及Kafka和ZeroMQ。
(2)由于支持多种协议,使RabbitMQ非常重量级,比较适合企业级开发。
因此当需要一个稳定的、高可靠性的、功能强大且易于管理的消息队列可以选择RabbitMQ。如果对消息吞吐量需求较大,且不在乎消息偶尔丢失的情况可以使用Kafka。
RabbitMQ是用Erlang语言编写的软件,因此在电脑上运行时需要配置Erlang的环境。
下载地址:http://www.erlang.org/downloads
下载完成后点击运行安装,默认会安装到C:\Program Files\erl9.0
为电脑配置Erlang系统环境变量,
(1)新建ERLANG_HOME系统变量 如图配置
(2)找到系统变量PATH点击编辑,新建,把刚配置的ERLANG_HOME加进去,如图:
(3)测试Erlang是环境是否安装成功,win+R 输入cmd打开命令提示窗然后输入 erl 如果安装成功则会显示Erlang的版本信息,如图:
如果不显示请检查环境变量时候配置正确。
下载地址:http://www.rabbitmq.com/install-windows.html
选择对应的版本,
下载过安装包之后点击安装,默认下一步一直点下去就ok了。
(1)同配置Erlang环境变量一样,首先找到RabbitMQ的安装位置,我的是C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4
(2)在环境变量中新建 系统变量:RABBITMQ_SERVER 值为RabbitMQ的安装位置C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4 如图:
然后找到Path变量,点击编辑,把%RABBITMQ_SERVER%\sbin;添加到Path变量中。如图:
(3)激活 RabbitMQ Web管理插件,RabbtMQ提供了一个Web界面的管理界面,使用户能简单方便的管理RabbitMQ服务器。
打开CMD命令提示窗口,进入到RabbitMQ的sbin目录,输入命令 rabbitmq-plugins.bat enable rabbitmq_management 回车
如图:
(4)打开RabbitMQ的安装目录的sbin文件夹,双击 rabbit-server.bat,启动RabiitMQ服务器
启动成功打开网页输入 127.0.0.1:15672查看MQ服务器,如图:
输入RabbitMQ默认超级管理员 用户名 guest 密码guest 点击登陆进入到Web插件管理页。
顾名思义,producer是指向mq推送消息的程序。
消息的消费者/接收者,一般是独立的程序。
消息是不具名的,它是我们具体的传输数据对象,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
也称作信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
表示消息队列服务器实体。
用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;生产者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。binding key 并不是在所有情况下都生效,它依赖于Exchange Type。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
使用流程
消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式投递到相应的Queue上,Queue又将消息发送给已经在此Queue上注册的consumer。具体流程如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好Binding关系。
(5)生产者客户端投递消息到exchange。
(6)exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。
(7)消费者客户端从对应的队列中获取并处理消息。
Direct Exchange(直接交换器类型)
direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
如果有两个队列 和 Exchange的BindingKey 分别是 key.test 和 key.test1,此时生产者向此Exchange提交一条消息,此消息的routingKey 值为 key.test,则此消息只会被路由到 第一个队列。
Fanout Exchang (广播式交换器类型)
不管消息的routingKey是什么,同一个消息会被路由到与此交换器绑定的所有队列种。
如一个Fanout 模式的Exchange绑定了两个队列,queue1和queue2,此时生产者向此Exchange发送一条消息,则不管消息的routingKey的值是什么,消息都会被路由到 queue1和queue2。
Topic Exchange (主题交换器类型)
此类型和Direct直接交换器类型相似,区别在于当Direct类型的交换器匹配routingKey和bindingKey时是完全匹配,而Topic主题交换器匹配routingKey和BindingKey时是模糊匹配。
模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。
如一个Topic模式的Exchange绑定了3个队列,queue1,queue2,queue3,它们之间的BindingKey分别是 *.key.test、topic.*.test, topic.# 此时生产者向此交换机推送一条routingKey值为 topic.key.test的消息,则三个队列均能收到此条消息。如果此消息的routingKey的值为 11.key.test,则只有queue1能收到此消息。如果此消息的routingKey为topic.11.test,则queue2和queue3均能收到此消息。
Headers Exchange
该类型的Exchange几乎没被用到过,Direct类型的交换器比此交换机更有优势。headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
Jar包导入
- <dependency>
- <!-- spring包 -->
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>4.3.7.RELEASE</version>
- </dependency>
-
- <!-- rabbit包 -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.4.1</version>
- </dependency>
-
- <!-- spring和mq集成包 -->
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.7.1.RELEASE</version>
- </dependency>

生产者配置
- import java.util.List;
-
- import org.springframework.amqp.core.AmqpTemplate;
-
-
- public class MyProducer {
-
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- public void sendDataToCrQueue(Object obj) {
- amqpTemplate.convertAndSend("queue_one_key", obj);
- }
- }
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
- <context:property-placeholder location="classpath:config.properties"/>
- <!--rabbitMq连接工厂-->
- <rabbit:connection-factory id="asConnectionFactory1" host="${rabbitmq.asHost}" username="${rabbitmq.asUsername}" password="${rabbitmq.asPassword}" port="${rabbitmq.asPort}" virtual-host="${rabbitmq.asVirtual}"/>
- <!--管理入口,用于创建交换机和队列-->
- <rabbit:admin id = "admin3" connection-factory="asConnectionFactory1" />
- <!--定义消息模板,用于消息的接收和发送-->
- <rabbit:template id="amqpTemplate" connection-factory="asConnectionFactory1" exchange="direct_exchange" encoding="utf-8" />
- <!--定义队列-->
- <rabbit:queue id ="queue1" name="queue1" durable="true" auto-delete="false"></rabbit:queue>
- <!--定义交换器-->
- <!--direct-->
- <rabbit:direct-exchange name="direct_exchange" durable="true" auto-delete="false" id="my-mq-exchange">
- <rabbit:bindings>
- <rabbit:binding queue="queue1" key="queue_one_key"/>
- <!-- 还可以绑定其他列队... -->
- </rabbit:bindings>
- </rabbit:direct-exchange>
- <!--topic-->
- <rabbit:topic-exchange name="topicExchange">
- <rabbit:bindings>
- <rabbit:binding queue="queue1" pattern="topic.*.#" />
- <!-- 还可以绑定其他列队... -->
- </rabbit:bindings>
- </rabbit:topic-exchange>
- <!--fanout-->
- <rabbit:fanout-exchange name="fanoutExchange" durable="true">
- <rabbit:bindings>
- <rabbit:binding queue="queue1"></rabbit:binding>
- <!-- 还可以绑定其他列队... -->
- </rabbit:bindings>
- </rabbit:fanout-exchange>
- </beans>

消费者配置
生产者和消费者的配置基本是一样的
- import java.util.List;
-
- import org.springframework.amqp.core.AmqpTemplate;
-
-
- public class MyProducer {
-
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- public void receiveMessage() {
- Message receive = amqpTemplate. receive("queque1") ;
- System.out.println("接收消息----------" + new String(receive.getBody(), "utf-8"));
- }
- }
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
- <context:property-placeholder location="classpath:config.properties"/>
- <!--rabbitMq连接工厂-->
- <rabbit:connection-factory id="asConnectionFactory1" host="${rabbitmq.asHost}" username="${rabbitmq.asUsername}" password="${rabbitmq.asPassword}" port="${rabbitmq.asPort}" virtual-host="${rabbitmq.asVirtual}"/>
- <!--管理入口,用于创建交换机和队列-->
- <rabbit:admin id = "admin3" connection-factory="asConnectionFactory1" />
- <!--定义消息模板,用于消息的接收和发送-->
- <rabbit:template id="amqpTemplate" connection-factory="asConnectionFactory1" exchange="direct_exchange" encoding="utf-8" />
- <!--定义队列-->
- <rabbit:queue id ="queue1" name="queue1" durable="true" auto-delete="false"></rabbit:queue>
- <!--定义交换器-->
- <!--direct-->
- <rabbit:direct-exchange name="direct_exchange" durable="true" auto-delete="false" id="my-mq-exchange">
- <rabbit:bindings>
- <rabbit:binding queue="queue1" key="queue_one_key"/>
- <!-- 还可以绑定其他列队... -->
- </rabbit:bindings>
- </rabbit:direct-exchange>
- </beans>

监听器配置
监听器会实时监听队列,能够实时接收生产者发送的消息。
- rabbitmq.host=127.0.0.1
- rabbitmq.username=admin
- rabbitmq.password=admin
- rabbitmq.port=5672
- rabbitmq.virtual=/
- public class MpRabbitMqListener implements ChannelAwareMessageListener {
- @Autowired
- private IItfImpInterfacesService iItfImpInterfacesService;
- private final Logger logger = LoggerFactory.getLogger(MpRabbitMqListener.class);
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- //获取对象Json字符串
- String jsonString = new String(message.getBody(), "utf-8");
- logger.info("MpRabbitMQ监听器接收json:"+jsonString);
- //校验并插入接口表
- try {
- ResponseData rd = iItfImpInterfacesService.importOuterData4JsonString(jsonString);
- //根据返回状态向RabbitMQ返回标识
- if (!rd.isSuccess()) {
- //失败向错误队列发布消息
- String errorMessage = jsonString + rd.getMessage();
- channel.basicPublish("EXCHANGE_4_ERP", "ERROR_QUEUE_4_ERP", MessageProperties.PERSISTENT_TEXT_PLAIN, errorMessage.getBytes());
- }
- }catch (Exception e){
- logger.error(e.getMessage());
- throw e;
- }finally {
- //通知服务器移除此条消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
-
- }
-
-
-
-
- }

- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
- <context:property-placeholder location="classpath:config.properties"/>
- <!--rabbitMq自动监听配置-->
- <!-- 连接配置 -->
- <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.virtual}"/>
- <rabbit:admin id= "admin1" connection-factory="connectionFactory" />
- <!--队列-->
- <rabbit:queue name="SRM_VENDOR_INFO_4_ERP" durable="true" auto-delete="false"></rabbit:queue>
- <!--监听类-->
- <bean id="mpRabbitMqListener" class="hscs.rabbitmqlistener.MpRabbitMqListener"/>
- <!--监听队列-->
- <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
- <rabbit:listener queues="SRM_VENDOR_INFO_4_ERP" ref="mpRabbitMqListener" />
- </rabbit:listener-container>
-
- </beans>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。