赞
踩
消息队列(Message Queue,简称MQ)
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
推荐文章
https://www.zhihu.com/question/65502802
这篇文章很生动形象的描述了MQ
https://www.zhihu.com/question/54152397
解耦:将消息写入消息队列,需要消息的时候自己从消息队列中订阅,从而原系统不需要做任何修改。
异步:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:原系统慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
RabbitMQ官网
https://www.rabbitmq.com/
RabbitMQ下载地址
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2.exe
erlang语言安装包
先装这个
http://www.erlang.org/download/otp_win64_22.2.exe
http://127.0.0.1:15672/
是管理后台的插件、我们要开启这个插件才能通过浏览器访问登录页面
rabbitmq-plugins enable rabbitmq_management
停止
net stop RabbitMQ
开启
net start RabbitMQ
通过默认账户 guest/guest 登录
如果能够登录,说明安装成功。
用户的级别
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有
类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的
RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于
mysql的db。Virtual Name一般以/开头
可以看见权限已加:
这是一个最简单的生产者和消费者的队列,生产者把消息放入队列,消费者获得消息,这个模式只有一个消费者和一个生产者,当然一个队列就够了,这种模式只需要配置虚拟主机参数即可,其他参数默认就可以通信。
这种模式出现了两个消费者,为了保证消费者之间的负载均衡和同步,需要在消息队列之间加上同步功能,工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列中。后台运行的一个工作进程将弹出任务并最终执行该任务。当你运行许多工人(消费者)时,任务将在他们之间分担。
实际上前两种模式也使用了交换机,只是我们没有设置,使用了默认的参数,交换机参数是可以配置的,如果消息配置的交换机参数和MQserver队列绑定(bind)的交换机名称相同,则转发,否则丢弃。
如上图所示,交换机要配置为direct类型,转发的规则变为检查队列的routingkey的值,如果routingkey值相同则转发,否则丢弃。
这种模式下交换机要配置为topic类型,routingkey配置失效。发送到一个话题交换机(topic exchange)信息,不能是任意routing_key -它必须是一个单词的列表,用逗号分隔。这些词可以是任何东西,但通常它们指定连接到消息的某些特性。一些有效的路由键的例子:stock.usd.nyse、nyse.vmw、“quick.orange.rabbit”,它更有特点是是可以模糊匹配,匹配规则如下:*(星号)可以代替一个词。#(哈希)可以代替零个或更多的单词。
如上图所示,这种模式主要使用在远程调用的场景下。一个应用程序需要另外一个应用程序来最终返回运行结果,这个过程可能是比较耗时的操作,使用这种模式是最合适的。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
package com.zhiyouo100.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception{ //创建连接工厂对象 ConnectionFactory factory = new ConnectionFactory(); //指定主机名 factory.setHost("localhost"); //指定端口号 factory.setPort(5672); //指定RabbitMQ服务器的虚拟主机 factory.setVirtualHost("localhost"); //账号 factory.setUsername("admin"); //密码 factory.setPassword("123"); return factory.newConnection(); } }
package com.zhiyouo100.RabbitMQDemo01; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhiyouo100.util.ConnectionUtil; public class App { public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); //从连接中创建通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("q_test_0_1", false, false, false, null); //发送消息 channel.basicPublish("", "q_test_0_1", null, "1234".getBytes()); channel.close(); connection.close(); } }
package com.zhiyouo100.RabbitMQDemo01; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhiyouo100.util.ConnectionUtil; public class App { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("q_test_0_1", false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume("q_test_0_1", consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost
password: 123
port: 5672
username: admin
virtual-host: localhost
package com.zhiyou100.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.core.Queue; @Configuration public class RabbitConfig { @Bean public Queue queue(){ //创建队列 return new Queue("q_hello"); } }
package com.zhiyou100.controller; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class SendrController { @Autowired private AmqpTemplate amqpTemplate; @RequestMapping("1.do") public String m1() { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());// 24小时制 String context = "hello " + date; amqpTemplate.convertAndSend("q_hello",context); return context; } }
package com.zhiyou100.service.impl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service("rabbitService") @RabbitListener(queues="q_hello") public class RabbitServiceImpl { @RabbitHandler public void getMessage(String message){ System.out.println("1:"+message); } }
package com.zhiyou100.service.impl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service("rabbit2Service")
@RabbitListener(queues="q_hello")
public class RabbitService2Impl {
@RabbitHandler
public void getMessage(String message){
System.out.println("2:"+message);
}
}
一个生产者、2个消费者。 测试结果: 1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。 2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。 2::2020-02-13 21:59:59 1::2020-02-13 22:00:01 2::2020-02-13 22:00:04 1::2020-02-13 22:00:04 2::2020-02-13 22:00:05 1::2020-02-13 22:00:05 2::2020-02-13 22:00:05 1::2020-02-13 22:00:05 2::2020-02-13 22:00:05 1::2020-02-13 22:00:05 2::2020-02-13 22:00:06 1::2020-02-13 22:00:06 2::2020-02-13 22:00:06 1::2020-02-13 22:00:06 2::2020-02-13 22:00:06
从图中我们可以看出交换机可以绑定队列,生产者发消息给交换机,交换机给队列,队列给消费者
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
其中headers交换器允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,所以我们本文也不做讲解。
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
·*· (星号) 用来表示一个单词 (必须出现的)
·#· (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
上面我使用的其实就是Direct Exchange直连型交换机,只不过我们自己不知道而已,Direct Exchange直连型交换机完整写法应该下面这种,我们改下配置
package com.zhiyou100.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue() { return new Queue("hello"); } @Bean DirectExchange exchange() { return new DirectExchange("e1"); } @Bean Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("hello"); } }
在网站上也可以查看交换机模式
上面的就是主题模式,根据
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
package com.zhiyou100.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue queue() { return new Queue(man); } @Bean public Queue queue2() { return new Queue(woman); } @Bean TopicExchange exchange() { return new TopicExchange("e1"); } @Bean Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(man); } @Bean Binding binding2() { return BindingBuilder.bind(queue2()).to(exchange()).with("topic.#"); } }
@RequestMapping("1.do") public String m1(String message){ if (message == null) { message = "1:用户未填写信息"; } String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); amqpTemplate.convertAndSend("e1","topic.man",date+message); return "1"; } @ResponseBody @RequestMapping("2.do") public String m2(String message){ if (message == null) { message = "2:用户未填写信息"; } String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); amqpTemplate.convertAndSend("e1","topic.woman",date+message); return "2"; }
package com.zhiyou100.service; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service @RabbitListener(queues="topic.man") public class RabbitService { @RabbitHandler public void getMessage(String message){ System.out.println("接受的service:"+message); } }
package com.zhiyou100.service; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service @RabbitListener(queues="topic.man") public class RabbitService { @RabbitHandler public void getMessage(String message){ System.out.println("接受的service:"+message); } }
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
package com.zhiyou100.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue queue() { return new Queue(man); } @Bean public Queue queue2() { return new Queue(woman); } @Bean FanoutExchange exchange() { return new FanoutExchange ("e1"); } @Bean Binding binding() { return BindingBuilder.bind(queue()).to(exchange()); } @Bean Binding binding2() { return BindingBuilder.bind(queue2()).to(exchange()); } }
@RequestMapping("1.do") public String m1(String message){ if (message == null) { message = "1:用户未填写信息"; } String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); amqpTemplate.convertAndSend("e1","topic.man",date+message); return "1"; } @ResponseBody @RequestMapping("2.do") public String m2(String message){ if (message == null) { message = "2:用户未填写信息"; } String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); amqpTemplate.convertAndSend("e1",null,date+message); return "2"; }
package com.zhiyou100.service; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service @RabbitListener(queues="topic.man2") public class RabbitService { @RabbitHandler public void getMessage(String message){ System.out.println("接受的service:"+message); } }
package com.zhiyou100.service; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service @RabbitListener(queues="topic.man2") public class Rabbit2Service { @RabbitHandler() public void getMessage2(String message){ System.out.println("接受的service2:"+message); } }
也就是消息的回调,其实就是消息确认(生产者推送消息成功,消费这接收消息成功)。
推荐文章
https://blog.csdn.net/qq_35387940/article/details/100514134
RPC和MQ的区别
https://blog.csdn.net/qq_41345773/article/details/89157299
RabbitMQ的使用
https://blog.csdn.net/hellozpc/article/details/81436980
Springboot 整合RabbitMq
https://blog.csdn.net/qq_35387940/article/details/100514134
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。