赞
踩
消息中间件基于队列模型在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。
1.Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到服务器端有可能会导致我们服务器端处理请求堆积。
2.Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
3.http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。
注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
在高并发环境下,如果消息的处理速度跟不上消息的生成速度,就会导致消息队列堆积,进而影响系统的稳定性和可用性。为了解决这个问题,引入消息限流策略是非常必要的。
消息限流是一种通过控制消息的生成速率和处理速率来平衡生产者和消费者之间的关系。通过设置合理的限流参数,可以控制系统的负载,避免资源耗尽和系统崩溃的风险。
RabbitMQ与消息限流策略的结合
1、预取计数(prefetch count):RabbitMQ中的预取计数可以控制消费者从队列中获取消息的数量。通过合理设置预取计数,可以平衡生产者和消费者之间的速率差异。当消费者处理完预取的消息后,才会继续从队列中获取新的消息,这样可以避免消息的堆积。
2、限制连接数和通道数:在RabbitMQ中,可以通过限制连接数和通道数来控制消息的生成速度和处理速度。通过限制连接数,可以限制生产者的连接数,控制消息的生成速度;通过限制通道数,可以提高消费者的处理速度,避免系统负载过高。
3、延迟队列(dead-letter queue):延迟队列是一种特殊的队列,用于存放无法立即处理的消息。当消息到达延迟队列后,可以设置一个延迟时间,在延迟时间过后再将消息重新发送给消费者进行处理。通过延迟队列,可以有效控制消息的处理速率,尤其适用于对实时性要求不高的场景。
应用场景:一般用于用户注册后,需要发送注册邮件和注册短信
传统做法:
串行的方式就是将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
- public class RabbitMQSingle {
- public String registerMember(){
- System.out.println("<01>注册数据插入数据库");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("注册成功");
- sendEmail();
- sendMseesage();
- System.out.println("<04>通知用户注册成功");
- return "通知用户注册成功";
- }
-
- public String sendMseesage() {
- System.out.println("<02>发送注册成功的短信");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("发送成功");
-
- return "发送短信业务完成";
- }
- public String sendEmail() {
- System.out.println("<03>发送邮件");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("发送邮件成功");
- return "发送邮件业务完成";
- }
- }
执行顺序如上
执行完以上代码需要9s+的时间
将注册信息写入数据库后,同时执行发送邮件,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。
一个类模拟是实现数据库操作
- @RestController
- public class RabbitMQSingle {
- @Autowired
- private SendMessageService sendMessageService;
-
- @GetMapping("/register")
- public String registerMember(){
- System.out.println("<01>注册数据插入数据库");
-
- System.out.println("注册成功");
- sendMessageService.sms();
- System.out.println("<04>通知用户注册成功");
- return "通知用户注册成功";
- }
- }
一个类模拟处理发送短信和邮件业务
- @Component
- public class SendMessageService {
-
- @Async
- public String sms(){
- System.out.println("<02>发送邮件");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("发送邮件完成");
- System.out.println("<03>发送短信");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("发送短信完成");
- return "发送信息和邮件完成完成";
- }
- }
直接3s就能收到注册成功的通知,发送短信和邮件的业务通知会由另一个线程去执行
简答说一下,Spring整合使用多线程的方式
首先再启动类上加上@EnableAsync的注解
然后在想要新开一个线程去执行的业务模块上加上@Async注解
引入消息队列之后就可以异步处理发送注册邮件和注册短信的业务,在注册信息写入数据库之后,这些不妨碍用户使用的业务,先加入到消息队列当中等待处理。
由此可见引入消息队列之后,响应时间=数据库写入时间+写入消息队列的时间,消息队列中的消息可以等待前面的消息处理完再处理。
如果想要看MQ的简单实现可以参靠:
应用场景:商城活动时,下单高峰时期,用户下单,订单业务模块调用库存业务模块的接口。
这种情况下就会存在订单业务和库存业务高度耦合的问题,一旦我们的库存业务使用不了了,那我i们的订单业务也就无法使用了。引入消息队列就可以解决这一问题:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
应用场景:流量削峰一般用于秒杀活动中。秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1、可以控制活动人数,超过限制就会直接丢弃掉后来的订单
2、可以缓解短时间的高流量(超出服务器承受范围)压垮服务器
1、用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
2、秒杀业务根据消息队列中的请求信息,再做后续处理。
关于消息队列的优点也就是上面列举的,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
缺点有以下几个:
1.系统可用性降低:系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用,可以点击这里查看。
2.系统复杂度提高:硬生生加个 MQ 进来,你怎么[保证消息没有重复消费]?怎么[处理消息丢失的情况]?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
3.一致性问题:A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
在介绍常见的消息中间件的方式的时候先介绍一下实现MQ的两种方式:AMQP,JMS
AMQP全称Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。(官方解释)
通俗来说,在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接收的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的API,用于在两个应用程序间或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。简单的理解:两个应用程序之间需要进行通信,使用一个JMS服务,进行中间的转发,通过JMS 的使用,可以解除两个程序之间的耦合。
两者间的区别和联系:
现在主流的消息中间件就4种:kafka、ActiveMQ、RocketMQ、RabbitMQ
ActiveMQ:基于JMS
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
Kafka:分布式消息系统,高吞吐量
接下来看一下这四种中间件之间有什么区别?
ActiveMQ现在来说已经算是一个比较过时的产品了,以前早期的项目使用的都是这个MQ,但是现在用的不多了。官网的更新频率已经变慢了,好久才会更新一次。
它的单机吞吐量是万级的,一些小的项目已经够用了,但是对于高并发的情况下已经不行了
在可用性上看,它使用的是主从架构实现的,具有高可用性;
在消息可靠性上看,有较低的概率是会丢掉数据的
综合来看,后续介绍的RabbitMQ完全可以代替掉这个
RabbitMQ出现后,大多数国内的公司选择抛弃ActiveMQ选择RabbitMQ,基本代替了ActiveMQ的位置,而且社区比较活跃。
它的单机吞吐量也是万级的,对于高并发的情况,它也是难以胜任的。
在可用性上,它使用的是镜像集群的模式,可以保证高可用性
在消息的可靠性上,它是可以保证不丢数据的
同时它还支持一些消息中间件的高级功能,如:消息重试、死信队列等
但是RabbitMQ的开发语言是erlang,国内很少有人精通erlang,所以经常无法阅读源码,对于大多数,中小型公司,不需要面对数据上的挑战,使用它还是非常合适的,但是对于BAT大公司来说,它就不太适合了。
MQ-RocketMQ,它是阿里开源的消息中间件,久经沙场,非常靠谱。
它支持高吞吐量,单机可达到10万级,能承受互联网的高并发的挑战。
在可用性上,使用的是分布式架构,可以搭建大规模的集群,性能很高。
在可靠性上,通过一定的配置可以达到数据的绝对不丢失。
同时,Rocket-MQ支撑大量的高级功能,如:延迟消息,事物消息、消息回溯、死信队列等。
它非常适用于Java体系的架构中,因为使用Java语言进行开发的,所以可以通过阅读源码的方式去了解更深的底层原理。
目前来看,它没有什么特别大的缺点,可以支持高并发下的技术挑战,可以基于它实现分布式事务,大型互联网公司和中小型公司都可以选择使用它来作为消息中间件使用。
kafka的吞吐量被公认为中间件中的翘楚,单机可以支持十几万的并发,相当强悍。
在高可用上同样支持分布式集群部署。
在消息可靠性上,如果保证异步的性能,可能会出现消息丢失的情况,因为它保存消息时是先存到磁盘缓冲区的,如果机器出现故障,缓冲区的数据是可能丢失的。
它的功能非常的单一,就是消息的接收与发送,因此不适合应用于许多场景。
它在行业内主要应用于大数据领域,使用它进行用户行为日志的采集和计算,来实现比如“猜你喜欢”的功能。
所以,如果没有大数据的需求,一般不会选择它。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。