赞
踩
支付服务要调用别的服务,调用了订单服务,在调用仓储服务,在以此调用别的,时间长。
服务流程如下:
缺点:
1.耦合度高。
2.性能下降。
3.资源浪费。
4.级联失败
服务流程如下:
优点:1.服务解耦(比如支付之后,不发短信提醒了,不用修改代码,直接取消短信服务的订阅即可)
2.性能提高,吞吐量提高。
3.故障隔离,不担心级联失败。
4.流量削峰。
缺点:
1.对Broker的依赖性太强了。
2.架构复杂,业务没有明显的流程,不好管理
MQ产品主要有一下四种:
1.RabbitMQ
2.ActiveMQ
3.RocketMQ
4.Kafka
主要的区别如下:
1.拉去容器
docker pull rabbitmq:management
2.创建镜像,并配置用户名和密码。(管理界面端口防火墙记得开发)
docker run -d --name my-rabbitmq \ 2 -p 5672:5672 \ # AMQP协议端口 3 -p 15672:15672 \ # 管理界面端口 4 -e RABBITMQ_DEFAULT_USER=admin \ 5 -e RABBITMQ_DEFAULT_PASS=admin \ 6 rabbitmq:management3.进入管理页面(页面如下)
输入网址:[IP地址:管理界面端口]
成功进入之后,我们看一下上面这6列:
1.Overview:总览。主要展示的是MQ
的概要信息 。
2.Connections:连接。无论消费者还是生产者都要建立连接。
3.Channels:通道。 消费者,生产者都要创建通道来执行事务。
4.Exchanges:交换机。消息的路由器,把消息路由到队列。
5.Queues:队列。做消息的存储。
6.Admin:管理。创建用户,创建虚拟主机。
MQ的大致架构如下:
(Advanced Message Queuing Protocol)是高级消息队列协议的缩写,它是一个开放标准的应用层协议,主要设计用于分布式系统之间高效、可靠地传输消息。
AMQP不是某个具体的软件产品或服务,而是一种通用的标准接口,任何遵循AMQP协议的软件系统都可以实现相互之间的互联互通,无论它们是由何种编程语言编写,运行在什么操作系统之上。
Spring AMQP(Spring Advanced Messaging Queuing Platform)是Spring框架的一部分,它是为简化在Java应用程序中使用高级消息队列协议(AMQP)而设计的一个抽象和便捷工具集。
简而言之,Spring AMQP的目标是让开发者更易于在Spring应用程序中使用消息队列服务,降低消息驱动架构的复杂性,提高开发效率。
生成者模块:
1.导入依赖:
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2.配置application.yml:(服务器防火墙端口记得开发嗷)
- spring:
- rabbitmq:
- host: 43.138.67.251 # rabbitMQ的ip地址
- port: 5672 # 端口
- username: root
- password: 123456
- virtual-host: /
3.用RabbitTemplate发送消息:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessage2SimpleQueue(){
- String routingKey = "simple.queues";
- String message = "hello, spring amqp!";
- rabbitTemplate.convertAndSend(routingKey, message);
- }
消费者模块:
也是导入依赖,配置yml。配置一个Bean监听队列,spring项目启动时,加载Bean对队列进行监听,消息阅后即焚。
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "simple")
- public void listenSimplequeue(String msg) {
- System.out.println("接收到消息:"+msg);
- }
- }
和上面的简单队列使用相比,添加多个消费者。
了解一下交换机Exchange:
交换机的作用是把消息按照一定的规则路由给队列,交换机不能缓存消息,路由失败,消息丢失。
下面我们介绍几种常用的交换机:
Fanout Exchange的使用:
先把交换机和队列绑定起来
- @Configuration
- public class FanoutConfig {
-
- //交换机 fanout
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanout.exchange");
- }
- //队列1 fanout1.queue
- @Bean
- public Queue fanoutqueue1(){
- return new Queue("fanout1.queue");
- }
- //队列2 fanout2.queue
- @Bean
- public Queue fanoutqueue2(){
- return new Queue("fanout2.queue");
- }
- //队列1绑定到交换机
- @Bean
- public Binding bind1(Queue fanoutqueue1, FanoutExchange fanoutExchange) {
- return BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);
- }
- //队列2绑定到交换机
- @Bean
- public Binding bind2(Queue fanoutqueue2, FanoutExchange fanoutExchange) {
- return BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);
- }
-
- }
消费者端监听消息:
- //Fanout1
- @RabbitListener(queues = "fanout1.queue")
- public void FanoutexchengeQueue1(String msg){
- System.err.println("fanout1.queue接收到消息:["+msg+"]");
- }
- //Fanout2
- @RabbitListener(queues = "fanout2.queue")
- public void FanoutexchengeQueue2(String msg){
- System.err.println("fanout1.queue接收到消息:["+msg+"]");
- }
生产者:
- //Fanout交换机
- @Test
- public void senFanoutExchange() {
- String exchangename = "fanout.exchange";
- String msg = "Hello FanoutExchange";
- rabbitTemplate.convertAndSend(exchangename,"",msg);
- }
Direct Exchange基本使用:
@RabbitListener里面指定参数会生成并绑定交换机和queue。每一个队列和交换机绑定时有一个BindingKye,生成者发送消息时带一个RoutingKey,交换机只把消息发到BindingKey和RoutingKey一致的queue。
- //Direct1
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct1.queue"),
- exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listenerDirectExchange1(String msg) {
- System.err.println("direct1.queue接收到消息:["+msg+"]");
- }
- //Direct2
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct2.queue"),
- exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}
- ))
- public void listenerDirectExchange2(String msg) {
- System.err.println("direct2.queue接收到消息:["+msg+"]");
- }
生成者:
- //Direct交换机
- @Test
- public void senDirectExchange() {
- String exchangename = "direct.exchange";
- String msg = "Hello DirectExchange";
- rabbitTemplate.convertAndSend(exchangename,"blue",msg);
- }
Topic Exchange的使用:
消费者:
- //Topic1
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic1.queue"),
- exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
- key = {"china.#"}
- ))
- public void listenTopicExchange1(String msg) {
- System.err.println("topic1.queue接收到消息:["+msg+"]");
- }
- //Topic2
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic2.queue"),
- exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
- key = {"#.news"}
- ))
- public void listenTopicExchange2(String msg) {
- System.err.println("topic2.queue接收到消息:["+msg+"]");
- }
生产者:
- //Topic交换机
- @Test
- public void senTopicExchange() {
- String exchangename = "topic.exchange";
- String msg = "Hello TopicExchange";
- rabbitTemplate.convertAndSend(exchangename,"aa.news",msg);
- }
SpringAMQP发消息到RabbitMQ,会将消息序列化,用的是JDK序列化,这种方式性能比较低下。
并且为了将Java对象转换为AMQP消息,以及将AMQP消息的内容反序列化回Java对象,通常我们会使用
所以我们要设置一个消息转换器,换一种序列化方式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。