赞
踩
异步调用常见实现就是事件驱动模式。
所谓事件驱动,就是和之前的订阅消息模式是一样的。首先我们要有一个消息代理者,当我们的前置业务完成之后,会通知给消息代理者,告诉他我的业务完成了,并携带一些业务信息,然后所有的依赖于前置业务完成之后才能执行的后置业务,全部都去订阅消息的管理者,当消息管理者接收到前置业务完成的消息时,他会通知给所有订阅消息的后置业务去执行自己的业务,而前置业务完成之后,则不会去管后置业务有没有完成,而是直接返回回去继续去接受其他人的请求。
当我们需要添加一个新的业务的时候,我们并不需要去像之前的同步调用的时候修改前置业务,而是直接让新加入的后置业务去订阅消息代理者,然后在接收到消息之后完成自己的逻辑即可。同样的,当我们想要移除一个业务的时候,只需要让他不在订阅消息即可。
在之前,我们的业务时长是所有服务完成的总和,如果一个业务需要150毫秒,则全部的业务走完需要550毫秒,当并发量高的时候,这种毫秒级的处理事件也会变得很大。但是在异步通信中,我们并不会计算所有服务的总和,而是只需要计算前置服务的执行时间,以及前置服务发送消息到消息管理者的时间,毕竟只有这段时间是用户可以感知到的,当前置服务完成了自己的服务,会直接给用户展示一个业务完成的信息,之后的所有的业务时间再长,用户也是不可知的,同样的,前置业务也无法感知后续业务的执行时间和执行结果。
在之前的同步调用的时候,当这个调用链上的某一个服务出现了问题,则这条链上的服务早晚都会全部都出问题,这就是级联失败。但是当我们使用异步调用的时候,由于前置业务只需要将业务完成的消息发送给消息管理者,然后后置任务去订阅即可,也就是说前置业务并不是直接调用后置业务,也就没有级联的现象,而后置业务出错也就只会有这一个单独的业务失败而已。
流量削峰是异步通信的一个独有的优势,也就是当我们的并发量非常大的时候,此时如果消息一直来一直来,那么这些服务会承受不住压力而崩溃。但是在异步通信中,我们这么多的消息,可以暂时缓存在消息订阅者这里,然后后置服务依然用自己的稳定速度去消费这些消息,直到消息全部消费完成,而这中间所有的负载全部都是由消息管理者在扛着。
异步通信的使用场景比较有限,在大部分的情况下,我们最长使用的还是同步通信。
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
在CentOS7中使用Docker来部署,首先,我们要打开Docker来搜索一下相关的镜像:
在docker中搜索RabbitMQ,就会出现相关的镜像,首先我们把镜像拉取到本地:
docker pull rabbitmq:3-management
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
其中有一些参数,比如环境变量,也就是-e参数后面我们配置了用户名和密码,然后--hostname是之后在集群配置中会用到的,以及我们暴露了两个端口,15672是我们的管理界面端口,5672是消息通信端口。
进到这里之后,用我们刚才在docker run命令中配置的用户名和密码进行登录:
首先我们的队列两边分别是我们的消息的发布者和订阅者,同时在我们的RMQ(RabbitMQ的简写)中,会存在多个虚拟主机,因为我们的RMQ可以存在多个用户,每个用户最好分配一个独立的虚拟主机,用来分离不同用户的消息队列。而在虚拟主机中,exchange是路由器,负责将消息分发到不同的队列中,从而让不同的消费者去消费。而queue就是队列,是负责缓存消息的地方。
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
那么,在这里,他会告诉你如何使用一个Java的项目去实现最基础的HelloWorld的项目,然后我们就可以先不看他,先来创建我们的项目,首先我们先创建一个最基本的SpringBoot的项目:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
这是一个父工程,主要用来管理依赖,然后我们再次创建两个子模块,分别用来模拟消息的发布者和订阅者,注意在创建子模块的时候就不要创建SpringBoot模块了,否则无法引用父工程的依赖:
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
这样,一个项目的最基本的结构就搭建完成了,之后就是最关键的,我们要开始编写最基本的消息发布者和订阅者的代码了,不过好在,RMQ已经帮我们写好了一套最基本的消息发布和订阅的代码,现在我们打开刚才的浏览器的页面:
你会跳转到GetHub的一个页面,在这里的代码,就是我们所需要的最基本的代码,然后我们直接复制过来:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// MQ服务地址
factory.setHost("192.168.80.4");
// MQ服务的用户名和密码
factory.setUsername("admin");
factory.setPassword("123456");
// 用户所对应的允许访问的虚拟主机
factory.setVirtualHost("/");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
由于GetHub的访问可能会失败,这里我直接粘贴出来了,有些地方需要修改,比如你的MQ服务所在的主机的IP地址,以及用户名和密码,还有用户对应的虚拟主机,这里我们可以来到之前的UI管理界面来查看一下用户对应的虚拟主机:
在最后一个Admin中,可以看到这里有一个【可以连接的虚拟主机】,这里的值就是我们当前用户可以访问的虚拟主机,我们直接填写过来就可以,这样就构成了一个完整的代码,可以看到在这个界面中除了Admin,之前我们说过的MQ的其他组成部分都在这里。
然后我们回到刚才的浏览器界面,继续向下,我们会看到有关消费者的代码:
然后继续点击链接,是和刚才一样的GetHub地址,我们继续复制:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// MQ服务地址
factory.setHost("192.168.80.4");
// MQ服务的用户名和密码
factory.setUsername("admin");
factory.setPassword("123456");
// 用户所对应的允许访问的虚拟主机
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
这样就完成了,然后我们首先运行publisher中的Send代码:
运行成功之后,在控制台中输出了上面的内容,然后我们来到RMQ的UI管理页面:
在Queue中,就已经存在了一个名为hello的消息,然后我们再来运行Consumer中的Recv方法:
我们注意到,明明定义在上面的输出语句却是在最后输出的,定义在下面的输出语句先输出,这里体现了我们的MQ技术的一个异步通信的时候一个回调机制,也就是说,我们的这个部分:
其实全部都是一个回调函数,用来接受存储在RMQ中的信息并处理,但是你处理归你处理,我的程序还在走,这也就是为什么我们定义在最后的输出语句却可以先执行,也就是上面的方法并不在我们的执行流程中,然后我们再来看到我们的UI控制页面中:
这个消息队列还在,但是里面存放的消息却没有了,这也是RMQ的一个特性,就是这个消息一旦被消费,则会立刻在消息队列中被删除。
同时,再次回到我们的IDEA中,我们可以看到,我们的消费者的程序并没有结束,而是一直在运行:
这就表示,他会一直等待消息队列中出现新的消息的时候再次消费,直到我们手动停止这个程序,我们可以小小的修改一下我们的Send代码:
这个message变量就是用来接受传递的数据的变量,我们把这个变量的值修改一下,然后再次运行程序:
在这里,就输出了我们刚才发送的消息,这就是一个最基本的消息队列应该做的事情。
等到了实际的开发中,这个单一程序就可以部署成一个微服务,并使用SpringBoot运行,扩展其中的业务,然后根据收到的消息,就是一个完整的基于消息的后置业务。
这里之所以消费者也要生命队列,是因为在启动的时候,我们可以先启动消费者再启动生产者,而消费者可以在队列没有数据的时候运行,但是不能在没有队列的时候运行,所以这里在消费者中也写一份生命队列的代码是防止到时候如果消费者找不到队列就出错。
之所以之前我们的入门案例不看代码,直接从官网上复制,是因为原生的RMQ的API实在是太过繁琐,没有实用性,所以也就没有学习的必要。我们可以直接使用最方便也是使用最多的SpringAMQP去代替我们之前使用的原生的API。
SpringAMQP的官方地址:Spring AMQP
在这里,他就告诉我们说我们存在一个侦听器容器,用来监听消息的东西。并且还有一个模板用来接受和发送,以及自动化的队列声明和绑定即可。
其实这个依赖在我们之前已经引入过了,只是当时我们没有注意到而已:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
在引入依赖之后,接下来就是来配置我们的RabbitMQ的相关配置
spring:
rabbitmq:
addresses: 192.168.80.4 # 主机名
port: 5672 # 端口号
username: admin # 用户名
password: 123456 # 密码
其实这个配置和之前我们在代码中写入的配置信息是一样的,无非就是IP地址,端口号,用户名,密码,以及对应的虚拟主机
import org.example.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = PublisherApplication.class)
public class SpringRMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void Send(){
String queueName = "hello";
String message = "Hello SpringAMQP";
rabbitTemplate.convertAndSend(queueName,message);
}
其实关键代码就是自动注入一个rabbitTemplate的模板类,然后这个模板类调用一个convertAndSend方法,第一个参数是队列的名称,第二个参数是消息。
一个小尴尬的地方,就是之前我在运行的时候发现测试过了,但是我的UI界面中一直没有显示有消息被缓存,我就觉得很奇怪,明明测试都过了,但是没有消息也太神奇了, 后来才意识到是我之前写的消费者的代码没有关,我一传进去消息他就给我消费了,而且RabbitMQ还是消费就删的那种,大家在做的时候一定不要忘记关闭之前的消费者啊。
首先我们要在consumer中也编写配置文件,和之前的配置文件中的内容是一样的,都是为了让程序能连接到RabbitMQ的服务:
spring:
rabbitmq:
addresses: 192.168.80.4 # 主机名
port: 5672 # 端口号
username: admin # 用户名
password: 123456 # 密码
他的逻辑也非常的简单,就是创建一个JavaBean,然后注册到Spring的容器中,然后通过一个注解来选择监听的消息队列的名称,然后在注解的方法中来编写对消息的处理逻辑:
package org.example;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringAMQPConsumer {
@RabbitListener(queues = "hello")
public void Recv(String message){
System.out.println("接收到了消息,消息的内容是【"+message+"】");
}
启动消费者,就不是启动单个类了,而是需要将整个的Spring给启动起来:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。