赞
踩
针对同步通信来说,异步通信可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能确保业务执行后的最终一致性。
消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息(主动拉取),或者订阅相应的队列,由MQ服务端给其推送消息(自动推送)。
1)异步通信:消息队列可以在应用程序之间提供异步通信,使得应用程序可以在不阻塞其他操作的情况下进行通信。
2)应用程序解耦:消息队列通过将消息的发送和接收分离实现应用程序的异步和解耦,使得它们不再相互依赖。
3)增强系统可伸缩性:消息队列可以帮助应用程序应对高流量和大量请求,提高系统的可伸缩性。
4)消息持久化:消息队列可以提供消息持久化功能,使得消息在系统故障时不丢失。
5)实现分布式系统:消息队列可以帮助实现分布式系统,使得系统可以在多个节点上运行。
6)实现消息缓存:消息队列可以提供消息缓存功能,使得系统可以在瞬时高流量的情况下正常运行。
***注:分布式系统是一组计算机系统一起工作,在终端用户看来,就像一台计算机在工作一样。这组一起工作的计算机,拥有共享的状态,它们同时运行,独立机器的故障不会影响整个系统的正常运行。
Broker
消息的中转站,一台服务器作为Broker(缓存代理),所有消息都通过它中转,生产者把消息发送给它就结束自己的任务,broker则把消息主动推送给消费者,或者消费者主动轮询
有Broker的MQ
kafka、rocketMQ(Alibaba)、activeMQ、RabbitMQ都是有broker的MQ
kafka消息处理性能最快的一款MQ
无broker的MQ
在生产者和消费者之前没有使用broker,如zeroMQ,直接使用socket通信
***注:Broker 缓存代理,Kafka集群中的一台或多台服务器统称broker.
kafka是一个分布式、支持分区(partition)、多副本(replica)的,基于zookeeper协调的分布式消息系统
Producer:消息生产者,向broker发送消息的客户端
Consumer:消息消费者,从broker读取消息的客户端
Topic:主题,是一个逻辑概念,kafka根据topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic
Broker:中间人,是物理资源,消息中间件处理节点,一个kafka节点就是一个broker,一个或多个broker可以组成一个kafka集群
***注:topic是通过kafka命令在ZK中创建的
为了实现集群的无状态,将broker节点的数据都保存在ZK里面
①broker的管理,broker 会向 zookeeper发送心跳请求来上报自己的状态。在zookeeper上会有一个专门用来记录Broker服务器列表的节点,节点路径为/brokers/ids
②zookeeper 保存了 topic 相关配置,例如 topic 列表、每个 topic 的 partition数量、副本的位置等等。
③kafka 集群中有一个或多个broker,其中有一个会通过zookeeper被选举为leader控制器。leader负责管理整个集群所有分区和副本的状态,例如某个分区的 leader 故障了,zookeeper会选举新的 leader。
④kafka提供服务的端口是9092,ZK提供服务的端口是2181
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以命令行中直接输入内容,并将这些内容已消息的形式发送到kafka集群中,在默认情况下,每一行会被当做成一个独立的消息。使用kafka发送消息的客户端,需要指定发送到的kafka服务器地址和topic。
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到的内容在命令中进行输出,默认是消费最新的消息,使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
① latest:消费者从当前主题中的最后一条消息的offset(偏移量)+ 1开始消费
② earliest:消费者从当前主题中的第一条消息开始消费
① 生产者将消息发送给broker(服务器),broker会将消息保存在本地的日志文件中
② 消息的保存是有序的,通过offset来描述消息的有序性
③ 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置;
④ 单播消息:每个分区只能被同一个消费者组内的一个consumer来消费;
⑤多播消息:不同消费组中的消费者之前消费消息互不影响;
① topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类;
② 不同的topic会被订阅该topic的消费者消费
③ topic中的消息会被保存在log日志文件中,数据量庞大,为了解决这个文件过大的问题,kafka提出了分区partition的概念
***注:每个broker中可以有多个topic
partition是一个逻辑概念,将一个topic中的消息分为多个分区来存储,解决了统一存储文件过大的问题;提高了IO吞吐量,即读写可以同时在多个分区中进行;
***注:partition是通过kafka命令在ZK中创建的
消息日志中保存的是消息内容和偏移量
① 00000.log:保存消息(消息内容)
② _consumer_offsets_49:kafka内部自己创建了_consumer_offsets主题包含50个分区。这个主题用来存放consumer消费某个topic的偏移量。因为每个consumer都会自己维护消费的topic偏移量,也就是说每个consumer会把消费的主题偏移量自主上报给kafka中的默认主题_consumer_offsets_。因此kafka为了提升这个topic的并发性,默认设置了50个分区,可以让多个consumer并行提交消费的偏移量,至于提交到哪个分区,通过hash函数确定,提交内容为:key-消费者组id+topic+分区号,value-当前offset值
③ _consumer_offsets_49文件中保存的消息默认保存7天,7天后会被删除,因此一个消费者如果超过7天没有消费消息,那么之后它再消费消息则找不到当初的offset(偏移量只保存7天)
***注:Hash,一般翻译做散列、杂凑,或音译为哈希,是把任意长度的输入通过散列算法变换成固定长度的输出,该输出就是散列值。Hash算法也被称为散列算法,Hash算法虽然被称为算法,但实际上它更像是一种思想。Hash算法没有一个固定的公式,只要符合散列思想的算法都可以被称为是Hash算法。
副本是Topic中的Partition创建的多个备份,多个副本在kafka集群的多个Broker中,其中一个副本是Leader,其他的都是Follower (副本是Partition的副本)
① Leader:kafka的读写操作都发生在Leader上,Leader负责把数据同步给Follower;当Leader宕机(挂了),经过主从选举,从多个Follower中选举一个新的Leader。
② Follower:接收Leader同步的数据
③ ISR:是一个集合,用来存放可以同步和已同步的节点;如果ISR中的节点性能较差(同步较慢),会被踢出ISR集合
kafka集群中有多个Broker,创建Topic时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的Broker里面
一个Partition只能被一个ConsumerGroup里面的某一个Comsumer消费,因为单个Comsumer消费单个Partition中的消息是顺序消费的,因此可以保证消费的顺序消费;在同一个Topic中的多个Partition之间不能保证总的消费顺序,因为消息从Producer传递到Topic中会有数据延迟,每个Comsumer消费能力也不同,因此不能保证总体的先进的消息被先消费。即kafka只在Partition范围内保证消息消费的局部顺序性
***注:副本是Partition的副本
ConsumerGroup中Comsumer的数量不能比一个Topic中Partition数量多,否则多出来的Comsumer消费不到消息。如果某个Comsumer挂了,会触发rebalance机制,让其他Comsumer来消费该Partition。
同步发送
生产者向broker发送消息,如果生产者没有收到来自broker的ack,生产者会阻塞,阻塞到3秒的时间,如果还没有收到ack,生产者会进行重试,重复次数为3
在同步发送中,ack有3种参数配置
① ack = 0
kafka-cluster(kafka集群)不需要任何broker收到消息,立即返回ack给生产者
② ack = 1
多副本中的leader接收消息,消息写入log,返回ack
③ ack = -1/all
多副本中的leader接收消息,默认配置的同步副本数,副本中至少1个或所有follower接收到消息,消息写入log后,返回ack
异步发送
生产者发送完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback方法
① kafka的消息是批量发送的
② kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区的大小为32Mb
③ kafka本地线程会去缓冲区中一次拉16kb的数据,发送到broker
④ 如果线程拉取的数据不到16kb,在10ms时间内会多次拉取,如果时间到了还是没有拉取满16kb,也会将已拉取的数据发送到broker
自动提交:消息poll下来,直接提交offset
手动提交:在消费消息时/后再提交offset
自动提交会丢消息,如果消费者还没有消费完poll下来的消息就自动提交了offset,此时消费者挂了,下一个消费者就会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就丢失了
消费者无论手动提交还是自动提交都需要把:所属的消费组+消费的主题+消费的分区+消费的偏移量等信息提交到集群的_consumer_offsets主题里面
① 手动同步提交:在消费完消息后调用同步提交的方法,在集群返回ack之前一直阻塞,返回ack表示提交成功,执行之后的逻辑
② 手动异步提交:在消息消费完后提交,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
① 默认情况下,消费者一次会poll 500条消息
② 代码中设置了长轮询的时间是1s:如果一次poll到500条,则停止poll,开始消费;如果一次没有poll满500条,且时间在1秒内,那么长轮询继续poll,如果时间到了,还没有满500条,则停止poll,开始消费
③ 消费者如果两次poll的时间间隔超过30s,集群会认为该消费者的消费能力弱,会将其提出消费组,触发rebalance机制,该机制会造成性能开销
消费者每隔1s向kafka集群发送心跳,如果集群发现超过10s,没有收到消费者发送的消息,其会被提出消费组,触发消费组的rebalance机制,将该分区交给消费组里面的其他消费者进行消费。
如果有消费组中有新的消费者启动,默认会从当前分区的最后一条消息的offset+1开始消费
① kafka集群中的broker在zk中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的controller,负责管理整个集群中的所有分区和副本的状态
② 当某个Partition中的Leader副本出现故障时,由Controller负责为该分区选举新的Leader副本
③ 当检测到某个分区的ISR集合发生变化时,由Controller负责通知所有Broker更新其元数据信息
④ 当使用kafka-topic.sh脚本为某个Topic增加分区数量时,同样还是由Controller负责让新分区被其他节点感知到
前提:消费者没有指明分区消费。当消费组里消费者和分区的关系发生变化时,Rebalance机制触发,重新调整消费者消费的分区
消费者消费分区的三种策略
① range:通过公式(分区数/消费者数 + 1 或 分区数/消费者数 )来计算某个消费者消费哪个分区
② 轮询
③ sticky(粘合):在触发rebalance机制以后,在消费者消费的原有分区不变的基础上进行调整,没有宕机的消费者原来消费哪些分区,之后依旧消费哪些分区,宕机的消费者空出来的分区被没有宕机的消费者重新分配
HW和LEO
LEO(log-end-offset)是某个副本最后消息的消费位置
HW(high-waterline)是已完成同步的位置,消息在写入broker时,且每个broker完成这条消息的同步后,HW才会变化;消息未同步好,即消息刚刚开始同步,HW没有变化时,消费者是消费不到当前消息的,在消息完成同步,HW更新之后,消费者才能消费到这条消息,这样的目的是为了防止消息丢失。
发送方:同步发送,ack是1或者-1/all可以放置消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数
消费方:把自动提交改为手动提交
在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上Broker已经收到了,此时生产者会进行重试,于是Broker就会收到多条相同的消息,而造成消费者的重复消费
生产者关闭重试:会造成消息丢失(不建议)
消费者:解决非幂等性问题
解决非幂等性问题的方案:①在数据库中创建联合主键,防止相同的主键创建出多条记录;②使用分布式锁,以业务id为锁,保证只有一条记录能创建成功
发送方:在发生消息时将ack不能设置为0,不能关闭重试。使用同步发送,等到发送成功再发送下一条,确保消息是顺序发送的
接收方:topic只能设置一个分区,消费组中只能有一个消费者
kafka的顺序消费使用场景不多,因为牺牲掉性能,如果要做顺序消费,可以使用rocketMQ
① 问题产生原因
消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的数据堆积越多,消费者寻址的性能越来越差,最后导致整个kafka对外提供的服务的性能越来越差,其他服务访问速度也变慢,造成服务雪崩
② 解决方案
(1)在这个消费者中,使用多线程,充分利用机器的性能进行消息消费
(2)创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度
(3)创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者,该消费者将poll下来的消息不进行消费,直接转发到新建的主题上,此时,新的主题的多个分区的多个消费者就开始一起消费了(不常用)
(4)通过业务的架构设计,提升业务层消费的性能
① 应用场景:订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现
② 具体方案
(1)kafka中创建相应的主题
(2)消费者消费该主题的消息(轮询)
(3)消费者消费消息时判断消息的创建时间和当前时间之间的差值是否超过30分钟(订单未支付):如果是,则去数据库中修改订单状态为已取消;如果不是,则记录当前消息的offset,并不再继续消费之后的消息,等待一分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,依次反复;
***注:幂等性是指多次访问的结果都是一样的,对于rest的请求,CRUD中只有C是非幂等的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。