赞
踩
声明:原文出处已在文末标出,本人出于学习,对其做了整理,收集干货,不作商业用途!
目录
8.2 如何保证消息不被重复消费(如何保证消息队列的幂等性)
正文
消息中间件(MOM),用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。本文主要讲消息队列,消息队列(Message queue 即MQ)是在消息的传输过程中保存消息的容器。MQ负责两个系统之间传递消息,这两个系统可以是异构的,处于不同硬件、不同操作系统、用不同语言编写,只需要简单配置、以及简单的调用几个MQ的API,就可以互相通讯,你不必考虑底层系统和网络的复杂性,MQ能够应付多种异常情况,例如网络阻塞、临时中断等等;消息队列主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。使用较多的消息队列有ActiveMQ,RocketMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ等,部分 数据库 如 Redis
、MySQL
以及 phxsql
也可实现消息队列的功能。
消息中间件的使用场景主要的有以下几点:解耦、异步、削峰、日志、消息通讯,下面逐个解释:
传统模式:
传统模式的缺点:系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统 A还需要修改代码,过于麻烦!
中间件模式::
中间件的优点:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
传统模式:
传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式优点: 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度。
传统模式:
传统模式的缺点:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常。
中间件模式:
中间件模式的优点:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压 是允许的,假如消息队列的长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
流量削峰的典型应用:淘宝的双11秒杀,团购抢购活动,一般因为流量过大,导致流量暴增,应用挂掉。为解决此问题, 一般 需要在应用前端加入消息队列,一者可以控制活动的人数,二来也可以缓解短时间内高流量压垮应用。
日志处理是指将消息队列用在日志处理中,比如Kafka的应用。架构简化如下
日志采集:负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列:负责日志数据的接收,存储和转发;
日志处理应用:订阅并消费kafka队列中的日志数据 。
应用:解决大量日志传输的问题。
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
客户端A和客户端B使用同一队列,进行消息通讯。
客户端A,客户端B,客户端C,客户端D 订阅同一主题(topic),进行消息发布和接收。实现类似聊天室效果。
消息通讯的典型应用:今日头条的私信功能,因为消息通信的数据不需要即时立即同步回来,不算核心数据,这样可以降低系统 的负荷。
AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
优点:可靠、通用
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。本博客之前写了个小例子,可参考 Spring boot+Maven+Websocket的升级版之stomp的小栗子(此为广播版,即一人发消息多人接收,点对点版本马上更新)_zxl技术博客的博客-CSDN博客
优点:命令模式(非topic\queue模式)
XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。
优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大
有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
过程:消息生产者(Product)生产消息发送到消息对列(Queue)中,然后消息消费者(Consumer)从queue中取出并且消费消息。
特点:消息被消费以后,Queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费;Queue实现了负载均衡,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。
过程:发布者(Publisher)将消息发布到Topic中,然后订阅者(Subscriber)从Topic中取出并且消费信息。
特点:消息被一个订阅者消费后,并没有消失,而是储存在Topic中,供所有的订阅者消费。
两者区别:这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费
对于消费者而言有两种方式从消息中间件获取消息,即push和pull:
①Push方式:由消息中间件主动地将消息推送给消费者,这种方式可以尽快的将消息发给消费者。
②Pull方式:由消费者主动向消息中间件拉取消息,这种方式相比push方式会稍微增加消息的到达消费者的时间。
问:如果消费者的处理消息的能力很弱(一条消息需要很长的时间处理),而消息中间件不断地向消费者Push消息,消费者的缓 冲区可能会溢出,如何解决。
答:如ActiveMQ的决绝办法那就是 prefetch limit,当推送消息的数量到达了perfetch limit规定的数值时,消费者还没有向消 息中间件返回确认的ACK(响应),消息中间件将不再继续向消费者推送消息。Prefech limit 的值取多少、prefetchACK的值 怎么设置,可以参考这个,也可自行百度 https://blog.csdn.net/qq_35508033/article/details/80255138
交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。
例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。
通过消息作为整合,大数据的背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS中的P2P和Pub/Sub消息模式:点对点(point to point, queue)与发布订阅(publish/subscribe,topic)最初是由JMS定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)。
试想,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低。
要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现,AMQP的主要特征是面向消息,队列,路由(包括点对点和发布/订阅),可靠性,安全,AMQP协议更多用在企业系统内,对数据一致性,稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
6.1.1 RabbitMQ优点
①并发性能极好。
②支持长时间消费和消息事务。如果消费者没有断开连接,并且没有确认消费,那么rabbitmq会一直等待消费者消费完成。
③支持批量确认和异步确认(comfirm机制)。
④权限控制比较完善。
⑤有强大的WEB管理页面。
6.1.2 RabbitMQ缺点
①只支持简单集群,对高级集群模式支持不好。
②不支持负载均衡,要借助LVS或者HAPROXY等技术实现。
③只能支持少量堆积,各种队列和元数据的存储需要大量的磁盘空间。
④Erlang语言开发,研究源码有一定难度。
6.1.3 RabbitMQ个人评价
RabbitMQ的特性适合长时间消费的应用场景,比如邮箱通知系统,短信通知系统。性能比ActiveMQ高,安全,可靠性强。但是不适合大数据的应用场合。
RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯Java开发,具有高吞吐量,高可用性,适合大规模分布式系统应用的特点,RocketMQ思路起源于kafka,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
6.2.1 RocketMQ 优点
①支持消息失败重试、消息事务、消息回溯,高可靠的持久化策略,消息几乎不会丢失。
②实时性好。RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。
③支持大量堆积,持久化效率高,批量刷盘。
④对分布式支持很好,使用nameserver管理集群成员,支持动态扩展。
6.2.2 RocketMQ 缺点
①存储方式只支持写到磁盘。
②使用消息事务,确保了消息的可靠投递,但是必然会导致并发性能下降,这个把双刃剑。
6.2.3 RocketMQ个人评价
高性能,满足可靠性,满足分布式,支持水平扩展,主从之间的自由切换,它虽然非常完善,但是收费。
ActiveMQ是Apache出品(下一代产品为Apollo),最流行的,能力强劲的开源消息总线,并且它是一个完全支持JMS规范的消息中间件其丰富的API,多种集群构建模式使得他称为业界老牌消息中间件,在中小型企业中应用广泛。
6.3.1 ActiveMQ优点
①产品成熟,在中等小型规模的场景有很多应用。
②功能齐全,支持多种恢复机制,支持事务,支持多种协议。
③Activemq高版本负载均衡做得比较好,高可用,支持动态扩展。
④AcitiveMQ很容易嵌入到spring的项目中。
6.3.2 ActiveMQ缺点
①存储方式只能支持少量堆积。
②ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具,示例代码少。
③只适合简单集群。
6.2.3 ActiveMQ个人评价
ActiveMQ适合一般的事务性业务场合,不适合大数据应用的场合。在面对高并发的环境下,性能不是很好。
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。
6.4.1 ZeroMQ优点
①性能极好,吞吐量极大,并发性能极好,实时性极好。
②引用ZeroMQ程序库即可在应用之间发送消息,拿来即用,产品能够快速迭代迅速成型。
③可以构建复杂的队列,处理逻辑可以定制。
④非持久化消息,对系统的存储空间要求不大,但是可以处理成千上万的数据,服务器成本低。
6.4.2 ZeroMQ缺点
①只有消息重传机制,消息容易丢失。
②功能不够完备,很多功能需要我们去实现,不适合复杂的业务。
6.4.3 ZeroMQ个人评价
ZeroMQ适合大数据、实时处理、高频计算、定制性强的应用场合,比如股票市场统计。由于对服务器的要求低,性 能极其优越,有利于小型企业的快速应用、软件的快速迭代。
Kafka是Apache下的一个子项目,开发语言为scala,是一个高性能跨语言分布式发布/订阅消息队列系统,只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
6.5.1 Kafka优点
①支持同步到数据库,能与hadoop、hbase等建立数据管道;
②吞吐量极大,在一台普通的服务器上既可以达到10W/s的吞吐速率;
③完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
④支持Hadoop数据并行加载,满足像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制;
⑤通过Hadoop的并行加载机制统一了在线和离线的消息处理。
6.5.2 Kafka缺点
长时间的消费,容易失败。
6.5.3 Kafka个人评价
kafka非常适合流式处理、大数据、单次消费时间较短的场合(日志采集、处理)
Redis 做为一个基于内存的K-V数据库,其提供了消息订阅的服务,所以完全可以当做一个轻量级的MQ来使用,目前应用案例较少,且不方便扩展。
综合对比,建议如下:
之前,ActiveMQ很火,官方一直对其维护,但是如今国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本,我个人不推荐用这个了;
对于RabbitMQ,是erlang语言开发,性能极好,但 erlang 语言却阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,社区活跃度也高;
对于RocketMQ,如今越来越多的公司会去用 RocketMQ,接口简单易用,而且毕竟在阿里大规模应用过,有阿里双十一处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,但社区可能有突然黄掉的风险,对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
对于kafka, 非常适合流式处理、大数据、单次消费时间较短的场合(日志采集、处理)。
综上,个人认为中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择;如果是大数据领域的实时计算、日志采集等场景,推荐 Kafka 。
由于系统使用了MQ,必然会导致系统可用性降低,那么如何解决呐?以RabbitMQ为例,谈谈如何保证系统的高可用。
造成重复消费的原因?
无论是哪种消息队列(MQ),造成重复消费的原因都类似,分两种情况,
一种是生产者和消息队列之间产生的,生产者把消息发送到消息队列,消息队列收到后,会返回给生产者一个确认信息,只是不同的MQ用的不同的反馈机制(RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念)。当网络传输故障等原因,这个确认信息没有返回到生产者,生产者认为消息未发送成功,就会重新发送刚才的信息,这样就导致了MQ重复接收消息。
另一种是消息队列和消费者之间产生的,消息通过MQ到达消费者,消费者消费完毕之后,会返回一个确认信息,MQ就知道该消息被消费了。不同的MQ也是用的不同的反馈机制(RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念)。当网络传输故障等原因,这个确认信息没有返回到MQ,导致MQ不知道该消息已经被消费过了,从而分给其他的消费者消费,这样就导致了消息被重复消费。
综上,无论是上图中的步骤“2”处的ACK丢失,还是步骤“4”处的ACK丢失,都可能导致消费者重复消费。
如何解决?
针对步骤“2”处的ACK丢失的解决办法:MQ系统内部生成一个inner-msg-id,该id全局唯一,由MQ生成,具备业务无关 性,对消息发送方和消息接收方屏蔽。
针对步骤“4”处的ACK丢失的解决办法:
由于步骤4处,经过了消费者,所以就和业务关系紧密,如下:
1)如果要拿数据插入数据库的话,可以设置个主键,避免重复。
2)如果要拿数据写入redis的话,就更没问题了,redis天然幂等。
3)如果不具备上面的条件,可以引入第三方介质来做消费记录,就是在生产者生产出来消息时,就加一个全局的唯一 的id,只要是消费过该消息,就将<id,message>以K-V的形式存入到redis中,等到以后的消费开始之前,先去redis 中查下有无消费记录即可。
每种MQ都要从三个角度来分析:生产者丢失数据、消息队列丢失数据、消费者丢失数据
以RabbitMQ为例:
1)生产者数据丢失的解决办法
①事务机制:生产者发送数据之前开启rabbitmq事务(channel.txSelet),此时,如果消息到消息队列的途中出错,就可以回滚事务(channel.txRollback),然后再继续重发消息,如果消息队列接收成功,就提交事务(channel.txCommit),但是该机制太耗性能,吞吐量就会大幅下降。
②confirm机制(推荐): 在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id(从1开始),当消息到达消息队列后,会返回ACK确认消息,告诉生产者已经正确到达目的队列了,如果RabiitMQ没能处理该消息,则会发送一个Nack消息给你,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。
2)消息队列数据丢失的解决办法
处理消息队列数据丢失的情况,可以采取开启持久化磁盘的配置,这个持久化配置可以和confirm机制配合使用,即在消息持久化磁盘后,给生产者发送个ACK信号,这样,即使在持久化磁盘之前RabbitMQ挂掉了,生产者没有收到ACK信号,也会给消息队列重发该条信息。
那么如何持久化呢,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列;
②、发送消息的时候将deliveryMode=2;
这样设置以后,RabbitMQ就算挂了,重启后也能恢复数据。
3)消费者数据丢失的解决办法
消费者数据丢失因为RabbitMQ采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时消息队列会立即将已发送的消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。解决办法就是关闭RabbitMQ的自动确认消息(自动ACK),改为手动的,每次代码里的确保消费者消费完消息后,手动发送ACK给消息队列,这样当消费者挂掉,消息队列一定时间内没有收到ACK,就会把该消息分配给别的消费者去消费,这样消息就不会丢失了。
不同的MQ的消息数据丢失的解决办法不同,可自行百度。
举个简单的例子,吃瓜子的过程(消息1:拆包装,消息2:剥皮,消息3::吃到瓜子仁),相当于生产者产生的三条信息,把他们发到消息队列中,正常的消费顺序应该是:消息1->消息2->消息3,但这时只有一个队列(queue),多个消费者(consumer),此时就有可能出现某一个消费者消费了信息3,却没有消费信息1和信息2(即吃到了瓜子仁,却没有拆包装和剥皮),这样显然不合理。所以我们就需要想办法,让这个顺序依次执行。
1.在 MQ 层面顺序处理
如RabbitMQ中,可以拆分多个queue,每个queue对应一个consumer,consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
2.在业务逻辑处理顺序性
在 MQ 层面支持消息的顺序处理开销太大,为了极少量的需求,增加整体上的复杂度得不偿失。可以使用msg-id判断消息的顺序(需要全局存储,记录消息的执行状态)。
恢复consumer消费速度法: 正常情况下,一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条,所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来,但是假如这时生产者不断的推数据过来,很有可能导致MQ满了,然后数据溢出,此方法着实不推荐。
紧急扩容法(推荐):先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量。然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
例如RabbitMQ是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据丢失了。
解决办法:批量重导法:可以等到夜深人静的时候,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把因为过期时间丢的数据给他补回来。
当MQ消息太多,而消费者消费的速度又太慢,导致MQ溢出了,此时就用上文中的8.5的紧急扩容法,然后加上8.6的批量重导法,把溢出的数据补回来就好了。
参考:
【原创】分布式之消息队列复习精讲 - 孤独烟 - 博客园(【原创】分布式之消息队列复习精讲)
消息中间件(一)MQ详解及四大MQ比较_jcpp9527的博客-CSDN博客_消息中间件(消息中间件(一)MQ详解及四大MQ比较)
RPC和MQ对比及其适用/不适用场合_gloryzyf的博客-CSDN博客_rpc和mq(RPC和MQ区别)
消息队列模式(点对点&发布订阅)_nsxqf的博客-CSDN博客_消息队列 点对点(点对点/发布订阅)
MQ常见问题及解决方案 - 简书(MQ常见问题及解决方案)
中间件对比----Kafka、ActiveMQ、RabbitMQ及RocketMQ性能对比_心诚则灵--艾的博客-CSDN博客(中间件对比----Kafka、ActiveMQ、RabbitMQ及RocketMQ性能对比)
消息总线能否实现消息必达?(MQ,如何做到消息必达)
Rocketmq原理&复杂分布式事务解法 - 简书(Rocketmq、kafka、Rabbitmq的详细对比 二、RocketMQ集群概述)
Kafka、rabbitmq、zeromq、rocketmq、activemq综合对比(二)_JohnyLin37291的博客-CSDN博客(Kafka、rabbitmq、zeromq、rocketmq、activemq综合对比(二))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。