当前位置:   article > 正文

消息中间件-RabbitMq学习

消息中间件-RabbitMq学习

目录

 

前言

RabbitMQ简介

RabbitMQ优缺点

 RabbitMQ安装(windows)

配置Erlang环境

RabbitMQ下载

RabbitMQ环境变量配置

RabbitMQ中的重要概念

交换器类型

Spring+RabbitMQ集成


前言

根据项目的需要,需要搭建一个消息中间件RabbitMQ服务器,并使用RabbitMQ监听器去同步其他系统的数据,在网上搜索浏览了大量的博客文档学习,吸取自己认为较好的文章的部分,并自己动手操作了一番记录了下来,希望能帮助想学习的小伙伴们。

再此特别感谢那些总结博客的大佬们提供的文档资料。

RabbitMQ简介

RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受 客户端/中间件 不同产品,不同的开发语言等条件的限制。

RabbitMQ优缺点

优点:

(1)由Erlang语言开发,支持大量协议:AMQP、XMPP、SMTP、STOMP。

(2)支持消息的持久化、负载均衡和集群,且集群易扩展。

(3)具有一个Web监控界面,易于管理。

(4)安装部署简单,上手容易,功能丰富,强大的社区支持。

(5)支持消息确认机制、灵活的消息分发机制。

缺点:

(1)由于牺牲了部分性能来换取稳定性,比如消息的持久化功能,使得RabbitMQ在大吞吐量性能方面不及Kafka和ZeroMQ。

(2)由于支持多种协议,使RabbitMQ非常重量级,比较适合企业级开发。

因此当需要一个稳定的、高可靠性的、功能强大且易于管理的消息队列可以选择RabbitMQ。如果对消息吞吐量需求较大,且不在乎消息偶尔丢失的情况可以使用Kafka。

 RabbitMQ安装(windows)

配置Erlang环境

RabbitMQ是用Erlang语言编写的软件,因此在电脑上运行时需要配置Erlang的环境。

下载地址:http://www.erlang.org/downloads

下载完成后点击运行安装,默认会安装到C:\Program Files\erl9.0

为电脑配置Erlang系统环境变量,

(1)新建ERLANG_HOME系统变量 如图配置

(2)找到系统变量PATH点击编辑,新建,把刚配置的ERLANG_HOME加进去,如图:

(3)测试Erlang是环境是否安装成功,win+R  输入cmd打开命令提示窗然后输入 erl 如果安装成功则会显示Erlang的版本信息,如图:

如果不显示请检查环境变量时候配置正确。

RabbitMQ下载

下载地址:http://www.rabbitmq.com/install-windows.html

选择对应的版本,

下载过安装包之后点击安装,默认下一步一直点下去就ok了。

RabbitMQ环境变量配置

(1)同配置Erlang环境变量一样,首先找到RabbitMQ的安装位置,我的是C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4

(2)在环境变量中新建 系统变量:RABBITMQ_SERVER 值为RabbitMQ的安装位置C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4 如图:

然后找到Path变量,点击编辑,把%RABBITMQ_SERVER%\sbin;添加到Path变量中。如图:

(3)激活 RabbitMQ Web管理插件,RabbtMQ提供了一个Web界面的管理界面,使用户能简单方便的管理RabbitMQ服务器。

打开CMD命令提示窗口,进入到RabbitMQ的sbin目录,输入命令 rabbitmq-plugins.bat  enable  rabbitmq_management  回车

如图:

(4)打开RabbitMQ的安装目录的sbin文件夹,双击 rabbit-server.bat,启动RabiitMQ服务器

       启动成功打开网页输入 127.0.0.1:15672查看MQ服务器,如图:

输入RabbitMQ默认超级管理员 用户名 guest 密码guest 点击登陆进入到Web插件管理页。

RabbitMQ中的重要概念

消息生产者Producer

顾名思义,producer是指向mq推送消息的程序。

消息消费者Consumer

消息的消费者/接收者,一般是独立的程序。

消息Message

消息是不具名的,它是我们具体的传输数据对象,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

消息通道Channel

也称作信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

虚拟主机Vhost

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体。

队列Queue

用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

交换器Exchange

用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding Key

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;生产者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。binding key 并不是在所有情况下都生效,它依赖于Exchange Type。

Routing key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

​​​​​​​ 使用流程

消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式投递到相应的Queue上,Queue又将消息发送给已经在此Queue上注册的consumer。具体流程如下:

(1)客户端连接到消息队列服务器,打开一个channel。

(2)客户端声明一个exchange,并设置相关属性。

(3)客户端声明一个queue,并设置相关属性。

(4)客户端使用routing key,在exchange和queue之间建立好Binding关系。

(5)生产者客户端投递消息到exchange。

(6)exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。

(7)消费者客户端从对应的队列中获取并处理消息。

交换器类型

Direct Exchange(直接交换器类型)

direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

如果有两个队列 和 Exchange的BindingKey 分别是 key.test 和 key.test1,此时生产者向此Exchange提交一条消息,此消息的routingKey 值为 key.test,则此消息只会被路由到 第一个队列。

Fanout Exchang (广播式交换器类型)

不管消息的routingKey是什么,同一个消息会被路由到与此交换器绑定的所有队列种。

如一个Fanout 模式的Exchange绑定了两个队列,queue1和queue2,此时生产者向此Exchange发送一条消息,则不管消息的routingKey的值是什么,消息都会被路由到 queue1和queue2。

Topic Exchange (主题交换器类型)

此类型和Direct直接交换器类型相似,区别在于当Direct类型的交换器匹配routingKey和bindingKey时是完全匹配,而Topic主题交换器匹配routingKey和BindingKey时是模糊匹配。

模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。

如一个Topic模式的Exchange绑定了3个队列,queue1,queue2,queue3,它们之间的BindingKey分别是 *.key.test、topic.*.test, topic.# 此时生产者向此交换机推送一条routingKey值为 topic.key.test的消息,则三个队列均能收到此条消息。如果此消息的routingKey的值为 11.key.test,则只有queue1能收到此消息。如果此消息的routingKey为topic.11.test,则queue2和queue3均能收到此消息。

Headers Exchange

该类型的Exchange几乎没被用到过,Direct类型的交换器比此交换机更有优势。headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

Spring+RabbitMQ集成

Jar包导入

  1. <dependency>
  2. <!-- spring包 -->
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-context</artifactId>
  5. <version>4.3.7.RELEASE</version>
  6. </dependency>
  7. <!-- rabbit包 -->
  8. <dependency>
  9. <groupId>com.rabbitmq</groupId>
  10. <artifactId>amqp-client</artifactId>
  11. <version>3.4.1</version>
  12. </dependency>
  13. <!-- spring和mq集成包 -->
  14. <dependency>
  15. <groupId>org.springframework.amqp</groupId>
  16. <artifactId>spring-rabbit</artifactId>
  17. <version>1.7.1.RELEASE</version>
  18. </dependency>

生产者配置

  1. import java.util.List;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. public class MyProducer {
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void sendDataToCrQueue(Object obj) {
  7. amqpTemplate.convertAndSend("queue_one_key", obj);
  8. }
  9. }
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6. <context:property-placeholder location="classpath:config.properties"/>
  7. <!--rabbitMq连接工厂-->
  8. <rabbit:connection-factory id="asConnectionFactory1" host="${rabbitmq.asHost}" username="${rabbitmq.asUsername}" password="${rabbitmq.asPassword}" port="${rabbitmq.asPort}" virtual-host="${rabbitmq.asVirtual}"/>
  9. <!--管理入口,用于创建交换机和队列-->
  10. <rabbit:admin id = "admin3" connection-factory="asConnectionFactory1" />
  11. <!--定义消息模板,用于消息的接收和发送-->
  12. <rabbit:template id="amqpTemplate" connection-factory="asConnectionFactory1" exchange="direct_exchange" encoding="utf-8" />
  13. <!--定义队列-->
  14. <rabbit:queue id ="queue1" name="queue1" durable="true" auto-delete="false"></rabbit:queue>
  15. <!--定义交换器-->
  16. <!--direct-->
  17. <rabbit:direct-exchange name="direct_exchange" durable="true" auto-delete="false" id="my-mq-exchange">
  18. <rabbit:bindings>
  19. <rabbit:binding queue="queue1" key="queue_one_key"/>
  20. <!-- 还可以绑定其他列队... -->
  21. </rabbit:bindings>
  22. </rabbit:direct-exchange>
  23. <!--topic-->
  24. <rabbit:topic-exchange name="topicExchange">
  25. <rabbit:bindings>
  26. <rabbit:binding queue="queue1" pattern="topic.*.#" />
  27. <!-- 还可以绑定其他列队... -->
  28. </rabbit:bindings>
  29. </rabbit:topic-exchange>
  30. <!--fanout-->
  31. <rabbit:fanout-exchange name="fanoutExchange" durable="true">
  32. <rabbit:bindings>
  33. <rabbit:binding queue="queue1"></rabbit:binding>
  34. <!-- 还可以绑定其他列队... -->
  35. </rabbit:bindings>
  36. </rabbit:fanout-exchange>
  37. </beans>

消费者配置

生产者和消费者的配置基本是一样的

  1. import java.util.List;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. public class MyProducer {
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void receiveMessage() {
  7. Message receive = amqpTemplate. receive("queque1") ;
  8. System.out.println("接收消息----------" + new String(receive.getBody(), "utf-8"));
  9. }
  10. }
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6. <context:property-placeholder location="classpath:config.properties"/>
  7. <!--rabbitMq连接工厂-->
  8. <rabbit:connection-factory id="asConnectionFactory1" host="${rabbitmq.asHost}" username="${rabbitmq.asUsername}" password="${rabbitmq.asPassword}" port="${rabbitmq.asPort}" virtual-host="${rabbitmq.asVirtual}"/>
  9. <!--管理入口,用于创建交换机和队列-->
  10. <rabbit:admin id = "admin3" connection-factory="asConnectionFactory1" />
  11. <!--定义消息模板,用于消息的接收和发送-->
  12. <rabbit:template id="amqpTemplate" connection-factory="asConnectionFactory1" exchange="direct_exchange" encoding="utf-8" />
  13. <!--定义队列-->
  14. <rabbit:queue id ="queue1" name="queue1" durable="true" auto-delete="false"></rabbit:queue>
  15. <!--定义交换器-->
  16. <!--direct-->
  17. <rabbit:direct-exchange name="direct_exchange" durable="true" auto-delete="false" id="my-mq-exchange">
  18. <rabbit:bindings>
  19. <rabbit:binding queue="queue1" key="queue_one_key"/>
  20. <!-- 还可以绑定其他列队... -->
  21. </rabbit:bindings>
  22. </rabbit:direct-exchange>
  23. </beans>

监听器配置

监听器会实时监听队列,能够实时接收生产者发送的消息。

  1. rabbitmq.host=127.0.0.1
  2. rabbitmq.username=admin
  3. rabbitmq.password=admin
  4. rabbitmq.port=5672
  5. rabbitmq.virtual=/
  1. public class MpRabbitMqListener implements ChannelAwareMessageListener {
  2. @Autowired
  3. private IItfImpInterfacesService iItfImpInterfacesService;
  4. private final Logger logger = LoggerFactory.getLogger(MpRabbitMqListener.class);
  5. @Override
  6. public void onMessage(Message message, Channel channel) throws Exception {
  7. //获取对象Json字符串
  8. String jsonString = new String(message.getBody(), "utf-8");
  9. logger.info("MpRabbitMQ监听器接收json:"+jsonString);
  10. //校验并插入接口表
  11. try {
  12. ResponseData rd = iItfImpInterfacesService.importOuterData4JsonString(jsonString);
  13. //根据返回状态向RabbitMQ返回标识
  14. if (!rd.isSuccess()) {
  15. //失败向错误队列发布消息
  16. String errorMessage = jsonString + rd.getMessage();
  17. channel.basicPublish("EXCHANGE_4_ERP", "ERROR_QUEUE_4_ERP", MessageProperties.PERSISTENT_TEXT_PLAIN, errorMessage.getBytes());
  18. }
  19. }catch (Exception e){
  20. logger.error(e.getMessage());
  21. throw e;
  22. }finally {
  23. //通知服务器移除此条消息
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  25. }
  26. }
  27. }
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6. <context:property-placeholder location="classpath:config.properties"/>
  7. <!--rabbitMq自动监听配置-->
  8. <!-- 连接配置 -->
  9. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.virtual}"/>
  10. <rabbit:admin id= "admin1" connection-factory="connectionFactory" />
  11. <!--队列-->
  12. <rabbit:queue name="SRM_VENDOR_INFO_4_ERP" durable="true" auto-delete="false"></rabbit:queue>
  13. <!--监听类-->
  14. <bean id="mpRabbitMqListener" class="hscs.rabbitmqlistener.MpRabbitMqListener"/>
  15. <!--监听队列-->
  16. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
  17. <rabbit:listener queues="SRM_VENDOR_INFO_4_ERP" ref="mpRabbitMqListener" />
  18. </rabbit:listener-container>
  19. </beans>
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/871421
推荐阅读
相关标签
  

闽ICP备14008679号