赞
踩
RabbitMq安装说明(Win)
1.1消息队列中间件简介
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ
以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景
1.2什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
1).可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2).灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3).消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
4).高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5).多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6).多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7).管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8).跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9).插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
1).安装依赖Erlang
打开RabbitMQ官网:RabbitMQ: One broker to queue them all | RabbitMQ
下载Eralng(otp_win64_25.2.exe)链接: https://erlang.org/download/otp_versions_tree.html
2).rabbitmq安装
下载rabbitmq(rabbitmq-server-3.11.5.exe)链接: Installing on Windows | RabbitMQ
第一步:
安装 otp_win64_25.2.exe
右键以管理员身份运行
接着一直点击下一步傻瓜式安装
配置Erlang环境变量 :ERLANG_HOME
path添加:%ERLANG_HOME%\bin
命令行运行:erl
Erlang安装完成。
第二 步:
安装rabbitmq-server-3.11.5.exe
双击文件rabbitmq-server-3.11.5.exe,傻瓜式安装,(注意不要安装在包含中文和空格的目录下!安装
window服务中就存在rabbitMQ了,并且是启动状态。 )接着安装管理界面(插件)
进入rabbitMQ安装目录的sbin目录
点击上方的路径框输入cmd,按下回车键
输入命令 rabbitmq-plugins enable rabbitmq_management 回车
第三步:
1.重启服务,双击rabbitmq-server.bat(双击后可能需要等待一会)
或命令启动 rabbitmq-server.bat start
2.打开浏览器,地址栏输入http://localhost:15672 ,即可看到管理界面的登陆页
输入用户名和密码,都为guest, 进入主界面:
最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理
以上RabbitMQ 服务安装完成。
首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机。
需要创建2个springboot项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。
首先创建 rabbitmq-provider,
pom.xml里用到的jar依赖:
- <dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-amqp</artifactId>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-web</artifactId>
-
- </dependency>
然后application.yml:
- # rabbitmq配置
-
- #给项目来个名字
-
- application:
-
- name: rabbitmq-provider
-
- #配置rabbitMq 服务器
-
- rabbitmq:
-
- host: 127.0.0.1
-
- port: 5672
-
- username: root
-
- password: root
-
- #虚拟host 可以不设置,使用server默认host
-
- #virtual-host: MyHost
接着我们先使用下direct exchange(直连型交换机),创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
- @Configuration
-
- public class DirectRabbitConfig {
-
- //队列 起名:TestDirectQueue
-
- @Bean
-
- public Queue TestDirectQueue() {
-
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
-
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
-
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
-
- // return new Queue("TestDirectQueue",true,true,false);
-
-
-
- //一般设置一下队列的持久化就好,其余两个就是默认false
-
- return new Queue("TestDirectQueue",true);
-
- }
-
-
-
- //Direct交换机 起名:TestDirectExchange
-
- @Bean
-
- DirectExchange TestDirectExchange() {
-
- // return new DirectExchange("TestDirectExchange",true,true);
-
- return new DirectExchange("TestDirectExchange",true,false);
-
- }
-
-
-
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
-
- @Bean
-
- Binding bindingDirect() {
-
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
-
- }
-
-
-
-
-
-
-
- @Bean
-
- DirectExchange lonelyDirectExchange() {
-
- return new DirectExchange("lonelyDirectExchange");
-
- }
-
-
-
-
-
- }
然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
- @RestController
- @RequestMapping("/bussi/rabbitmq")
- public class SendMessageController extends BaseController {
-
-
-
- @Autowired
- RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法
-
-
-
- @GetMapping("/sendDirectMessage")
- public AjaxResult sendDirectMessage() {
-
- String messageId = String.valueOf(UUID.randomUUID());
-
- String messageData = "test message, hello!";
-
- String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-
- Map<String, Object> map = new HashMap<>();
-
- map.put("messageId", messageId);
-
- map.put("messageData", messageData);
-
- map.put("createTime", createTime);
-
- // 将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
-
- rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
-
- return AjaxResult.success(map);
-
- }
-
- }
把rabbitmq-provider项目运行,调用下接口:
因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们去rabbitMq管理页面看看,是否推送成功:
消息已经推送到rabbitMq服务器上面了。
接下来,创建rabbitmq-consumer项目:
pom.xml里的jar依赖:
- <dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-amqp</artifactId>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-web</artifactId>
-
- </dependency>
然后是 application.yml:
- # rabbitmq配置
-
- #给项目来个名字
-
- application:
-
- name: rabbitmq-provider
-
- #配置rabbitMq 服务器
-
- rabbitmq:
-
- host: 127.0.0.1
-
- port: 5672
-
- username: root
-
- password: root
-
- #虚拟host 可以不设置,使用server默认host
-
- #virtual-host: MyHost
然后一样,创建DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,其实消费者也是生成者的身份,也能推送该消息。):
- @Configuration
- public class DirectRabbitConfig {
-
- // 队列 起名:TestDirectQueue
-
- @Bean
- public Queue TestDirectQueue() {
-
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
-
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
-
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
-
- // return new Queue("TestDirectQueue",true,true,false);
-
-
-
- // 一般设置一下队列的持久化就好,其余两个就是默认false
-
- return new Queue("TestDirectQueue", true);
-
- }
-
-
-
- // Direct交换机 起名:TestDirectExchange
-
- @Bean
- DirectExchange TestDirectExchange() {
-
- // return new DirectExchange("TestDirectExchange",true,true);
-
- return new DirectExchange("TestDirectExchange", true, false);
-
- }
-
-
-
- // 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
-
- @Bean
- Binding bindingDirect() {
-
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
-
- }
-
-
-
- @Bean
- DirectExchange lonelyDirectExchange() {
-
- return new DirectExchange("lonelyDirectExchange");
-
- }
-
-
-
- }
然后是创建消息接收监听类,DirectReceiver.java:
- @Component
- @RabbitListener(queues = "TestDirectQueue") // 监听的队列名称 TestDirectQueue
- public class DirectReceiver {
-
-
-
- @RabbitHandler
- public void process(Map testMessage) {
-
- System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
-
- }
-
-
-
- }
然后将rabbitmq-consumer项目运行起来,可以看到把之前推送的那条消息消费下来了:
然后可以再继续调用rabbitmq-provider项目的推送消息接口,可以看到消费者即时消费消息:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。