赞
踩
消息队列Message Queue,简称MQ。
是一种应用间的通信方式,主要由三个部分组成。
消息队列的应用场景较多,常用的可以分为三种:
在架构技术选型的时候一般根据业务的需求选择合适的中间件:
比如中小型公司,低吞吐量的一般用 ActiveMQ、RabbitMQ 较为合适,大数据高吞吐量的大型公司一般选用 Kafka 和RocketMQ。
在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,那么它必须要经过几个拷贝的过程,如图(贴图)。
在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历 4 次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的,分别是:
除此之外,由于用户空间和内核空间的切换会带来 CPU 的上线文切换,对于 CPU 性能也会造成性能影响。
而零拷贝,就是把这两次多于的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核中直接传输给Socket,而不需要再经过应用程序所在的用户空间,如下图所示
零拷贝通过DMA(Direct Memory Access)技术把文件内容复制到内核空间中的Read Buffer,
接着把包含数据位置和长度信息的文件描述符加载到Socket Buffer 中,DMA 引擎直接可以把数据从内核空间中传递给网卡设备。
在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了 2 次cpu 的上下文切换,对于效率有非常大的提高。
所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。
在程序中如何实现零拷贝呢?
除此之外,还有一个 mmap 的文件映射机制;
它的原理是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。
kafka 是一个用来实现异步消息通信的中间件,它的整个架构由Producer、 Consumer、Broker 组成。
所以,对于kafka 如何保证消息不丢失这个问题,可以从三个方面来考虑和实现。
首先是Producer 端,需要确保消息能够到达 Broker 并实现消息存储,在这个层面,有可能出现网络问题,导致消息发送失败,所以,针对Producer 端,可以通过 2 种方式来避免消息丢失
然后是Broker 端,Broker 需要确保Producer 发送过来的消息不会丢失,也就是只需要把消息持久化到磁盘就可以了。
(如图)但是,Kafka 为了提升性能,采用了异步批量刷盘的实现机制,也就是说按照一定的消息量和时间间隔来刷盘,而最终刷新到磁盘的这个动作,是由操作系统来调度的,所以如果在刷盘之前系统崩溃,就会导致数据丢失。
Kafka 并没有提供同步刷盘的实现,所以针对这个问题,需要通过Partition的副本机制和acks 机制来一起解决。
最后,就是Consumer 必须要能消费到这个消息,实际上,我认为,只要producer和broker 的消息可靠的到了保障,那么消费端是不太可能出现消息无法消费的问题,除非是Consumer 没有消费完这个消息就直接提交了,但是即便是这个情况,也可以通过调整offset 的值来重新消费。
首先,(如图)Kafka Broker 上存储的消息,都有一个Offset 标记。然后kafka 的消费者是通过 offSet 标记来维护当前已经消费的数据,
每消费一批数据,Kafka Broker 就会更新OffSet 的值,避免重复消费。
默认情况下,消息消费完以后,会自动提交 Offset 的值,避免重复消费。
Kafka 消费端的自动提交逻辑有一个默认的 5 秒间隔,也就是说在 5 秒之后的下一次向 Broker 拉取消息的时候提交。
所以在Consumer 消费的过程中,应用程序被强制 kill 掉或者宕机,可能会导致 Offset没提交,从而产生重复提交的问题。
除此之外,还有另外一种情况也会出现重复消费。
(如图)在Kafka 里面有一个Partition Balance 机制,就是把多个Partition 均衡的分配给多个消费者。
Consumer 端会从分配的Partition 里面去消费消息,如果 Consumer 在默认的 5 分钟内没办法处理完这一批消息。
就会触发Kafka 的Rebalance 机制,从而导致Offset 自动提交失败。
而在重新Rebalance 之后,Consumer 还是会从之前没提交的 Offset 位置开始消费,也会导致消息重复消费的问题。
基于这样的背景下,我认为解决重复消费消息问题的方法有几个:
首先,发送到 Kafka Broker 上的消息,最终是以 Partition 的物理形态来存储到磁盘上的。
(如图)而Kafka 为了保证Parititon 的可靠性,提供了 Paritition 的副本机制,然后在这些Partition 副本集里面。存在Leader Partition 和Flollower Partition。
生产者发送过来的消息,会先存到 Leader Partition 里面,然后再把消息复制到 Follower Partition,
这样设计的好处就是一旦Leader Partition 所在的节点挂了,可以重新从剩余的 Partition 副本里面选举出新的 Leader。
然后消费者可以继续从新的 Leader Partition 里面获取未消费的数据。
在Partition 多副本设计的方案里面,有两个很关键的需求。
这两个需求都需要涉及到网络通信,Kafka 为了避免网络通信延迟带来的性能问题,以及尽可能的保证新选举出来的Leader Partition 里面的数据是最新的,所以设计了 ISR 这样一个方案。
ISR 全称是 in-sync replica,它是一个集合列表,里面保存的是和 Leader Parition 节点数据最接近的 Follower Partition
如果某个Follower Partition 里面的数据落后 Leader 太多,就会被剔除 ISR 列表。
简单来说,ISR 列表里面的节点,同步的数据一定是最新的,所以后续的Leader 选举,只需要从ISR 列表里面筛选就行了。
所以,我认为引入ISR 这个方案的原因有两个:
首先,在 kafka 的架构里面,用到了 Partition 分区机制来实现消息的物理存储(如图),在同一个topic 下面,可以维护多个partition 来实现消息的分片。
生产者在发送消息的时候,会根据消息的 key 进行取模(如图),来决定把当前消息存储到哪个partition 里面。
并且消息是按照先后顺序有序存储到 partition 里面的。
在这种情况下,(如图),假设有一个topic 存在三个partition,而消息正好被路由到三个独立的partition 里面。
然后消费端有三个消费者通过 balance 机制分别指派了对应消费分区。因为消费者是完全独立的网络节点,
所有可能会出现,消息的消费顺序不是按照发送顺序来实现的,从而导致乱序的问题。
针对这个问题,一般的解决办法就是自定义消息分区路由的算法,然后把指定的key都发送到同一个 Partition 里面。(如图)
接着指定一个消费者专门来消费某个分区的数据,这样就能保证消息的顺序消费了。
另外,有些设计方案里面,在消费端会采用异步线程的方式来消费数据来提高消息的处理效率,那这种情况下,因为每个线程的消息处理效率是不同的,所以即便是采用单个分区的存储和消费也可能会出现无序问题,针对这个问题的解决办法就是在消费者这边使用一个阻塞队列,把获取到的消息先保存到阻塞队列里面,然后异步线程从阻塞队列里面去获取消息来消费。
在Java中,可以使用Kafka的消费者API来实现消息的顺序消费。以下是几种可以考虑的方法:
单个分区消费:创建一个单独的消费者实例来消费一个分区的消息。这样可以确保在单个分区内的消息按顺序消费。但是需要注意,如果有多个分区,不同分区的消息仍可能以并发方式进行消费。
指定分区消费:通过指定消费者订阅的特定分区,可以确保只消费指定分区的消息。这样,可以通过将相关消息发送到同一个分区来保证消息的顺序消费。
按键分区:Kafka允许根据消息的键(key)来决定将消息发送到哪个分区。如果消息的键是相同的,Kafka会将它们发送到同一个分区。因此,可以根据消息的键来保证消息的顺序消费。
无论选择哪种方法,都应该注意以下几点:
当我们向某个Topic 发送消息的时候,在 Kafka 的Broker 上,会通过Partition 分区的机制来实现消息的物理存储。
一个Topic 可以有多个Partition,相当于把一个 Topic 里面的N 个消息数据进行分片存储。
消费端去消费消息的时候,会从指定的Partition 中去获取。
在同一个消费组中,一个消费者可以消费多个Partition 中的数据。但是消费者的数量只能小于或者等于Partition 分区数量。
理解了Kafka 的工作机制以后,再来理解一下exactlyOnce 的意思,在MQ 的消息投递的语义有三种:
准确来说,目前市面上的MQ 产品,基本上都没有提供Exactly Once 语义的实现。我们只能通过一些其他手段来达到 Exactly Once 的效果。也就是确保生产者只发送一次,消费端只接受一次
Kafka是-种分布式流数据平台…被广泛应用王太规模的实时数据处理和消息传递系统中。作为一种高吞吐量、低延迟的消息系统,Kafka的消息拉取机制是其核心原理之一。
Kafka的消息拉取机制是指消费者从Kafka集群中主动拉取消息的过程。相比于传统的发布-订阅模式中,由消息中间件主动推送消息给消费者,Kafka的消息拉取机制具有更高的灵活性和可控性。消费者可以根据自身的处理能力和需求主动拉取消息,从而实现更加高效的消息处理。
在使用Kafka的消息拉取机制之前,消费者需要先订阅一个或多个主题。主题是Kafka中消息的分类单位,可以看作是消息的容器。消费者通过指定主题来获取相应的消息。
在拉取消息之前,消费者需要指定拉取消息的偏移量。偏移量可以理解为消息在主题中的位置信息,通过指定偏移量,消费者可以准确地获取指定位置的消息。Kafka提供了两种偏移量的管理方式:手动管理和自动管理。消费者可以根据需要选择适合的偏移量管理方式。
一旦订阅了主题并指定了偏移量,消费者就可以开始拉取消息了。消费者向Kafka集群发送拉取消息请求,Kafka集群根据请求返回相应的消息。Kafka支持按照时间戳、偏移量范围等方式进行消息拉取,消费者可以根据自身的需求选择合适的拉取方式。
消费者获取到消息后,可以进行相应的处理。处理方式可以根据实际业务需求而定,例如存储到数据库、进行实时计算等。
在消息处理完成后,消费者需要提交偏移量。偏移量的提交是为了记录消费者已经处理过的消息位置,以便下次拉取消息时能够继续从上次的位置开始。消费者可以选择手动提交偏移量或自动提交偏移量,具体方式根据实际情况而定。
Kafka的消息拉取机制相比于消息推送机制具有以下优势:
Kafka的消息拉取机制适用于以下场景:
Kafka的消息拉取机制是其核心原理之一,通过消费者主动拉取消息的方式,实现了高吞吐量、低延迟的消息处理。消息拉取机制具有灵活性高、节约资源、异步处理等优势,并适用于实时数据处理和分布式系统集成等场景。通过深入理解和灵活运用Kafka的消息拉取原理,可以更好地实现大规模实时数据处理和消息传递的需求。
在单机同步发送的场景下,Kafka>RocketMQ,Kafka的吞吐量高达17.3w/s,RocketMQ吞吐量在11.6w/s。
Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafak采用了两个技术,顺序写入和MMFile。
因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
收到消息后Kafka会把数据插入到文件末尾。这种方法有一个缺陷——没有办法删除数据,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据。
Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。
Memory Mapped Files也被翻译成内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
这种方法也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)。
传统read/write方式进行网络文件传输的方式,文件数据实际上是经过了四次copy操作:
硬盘—>内核buf—>用户buf—>socket相关缓冲区—>协议引擎
kafka基于sendfile实现Zero Copy,直接从内核空间(DMA的)到内核空间(Socket的),然后发送网卡。
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议。
每次读消息时先读逻辑队列consumQue中的元数据,再从commitlog中找到消息体。但是入口处rocketmq采用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
随机读具体流程
常见消费模型有以下几种:
RocketMQ默认是采用pushConsumer方式消费的,从概念上来说是推送给消费者,它的本质是pull+长轮询。
这样既通过长轮询达到了push的实时性,又有了pull的可控性。系统收到消息后会自动处理消息和offset(消息偏移量),如果期间有新的consumer加入会自动做负载均衡(集群模式下offset存在broker中; 广播模式下offset存在consumer里)。当然我们也可以设置为pullConsumer模式,这样灵活性会提高,但是代码却会很复杂,需要手动维护offset,消息存储和状态。
零拷贝技术有mmap及sendfile,sendfile大文件传输快,mmap小文件传输快。MMQ发送的消息通常都很小,rocketmq就是以mmap+write方式实现的。
kafka性吞吐量更高主要是由于Producer端将多个小消息合并,批量发向Broker
。kafka采用异步发送
的机制,当发送一条消息时,消息并没有发送到broker而是缓存起来,然后直接向业务返回成功,当缓存的消息达到一定数量时再批量发送。同时kafka采用异步刷盘
的机制,异步刷盘肯定是比同步刷盘更快的;
此时减少了网络io,从而提高了消息发送的性能,但是如果消息发送者宕机,会导致消息丢失,业务出错,所以理论上kafka利用此机制提高了io性能却降低了可靠性。
RocketMQ通常使用的Java语言,缓存过多消息会导致频繁GC。
Producer调用发送消息接口,消息未发送到Broker,向业务返回成功,此时Producer宕机,会导致消息丢失,业务出错。
Producer通常为分布式系统,且每台机器都是多线程发送,我们认为线上的系统单个Producer每秒产生的数据量有限,不可能上万。
缓存的功能完全可以由上层业务完成。
当broker里面的topic的partition数量过多时,kafka的性能却不如rocketMq。
kafka和rocketMq都使用文件存储,但是kafka是一个分区一个文件,当topic过多,分区的总量也会增加,kafka中存在过多的文件,当对消息刷盘时,就会出现文件竞争磁盘,出现性能的下降。一个partition(分区)一个文件,顺序读写。一个分区只能被一个消费组中的一个 消费线程进行消费,因此可以同时消费的消费端也比较少。
rocketMq所有的队列都存储在一个文件中,每个队列的存储的消息量也比较小,因此topic的增加对rocketMq的性能的影响较小。rocketMq可以存在的topic比较多,可以适应比较复杂的业务。
文件的组织方式是“ topic + 分区”,每一个 topic 可以创建多个分区,每一个分区包含单独的文件夹。
分区支持副本机制,即一个分区可以在多台机器上复制数据。topic 中每一个分区会有 Leader 与 Follow。Kafka 的内部机制可以保证 topic 某一个分区的 Leader 与 Follow 不在同一台机器上,并且每一台 Broker 会尽量均衡地承担各个分区的 Leade。当然,在运行过程中如果 Leader 不均衡,也可以执行命令进行手动平衡。
Leader 节点承担一个分区的读写,Follow 节点只负责数据备份。
Kafka 的负载均衡主要取决于分区 Leader 节点的分布情况。分区的 Leader 节点负责读写,而从节点负责数据同步,如果 Leader 分区所在的 Broker 节点宕机,会触发主从节点的切换,在剩下的 Follow 节点中选举一个新的 Leader 节点。这时数据的流入流程如下图所示:
分区 Leader 收到客户端的消息发送请求后,可以有两种数据返回策略。一种是将数据写入到 Leader 节点后就返回,还有一种是等到它的从节点全部写入完成后再返回。这个策略选择非常关键,会直接影响消息发送端的时延,所以 Kafka 提供了 ack 这个参数来进行策略选择:
当 ack = 0 时,不等 Broker 端确认就直接返回,即客户端将消息发送到网络中就返回“发送成功”;
当 ack = 1 时,Leader 节点接受并存储消息后立即向客户端返回“成功”;
当 ack = -1 时,Leader 节点和所有的 Follow 节点接受并成功存储消息,再向客户端返回“成功”。
RocketMQ 所有主题的消息都会写入到 commitlog 文件中,然后基于 commitlog 文件构建消息消费队列文件(Consumequeue),消息消费队列的组织结构按照 /topic/{queue} 来组织。从集群的视角来看如下图所示:
RocketMQ 默认采取的是主从同步架构,即 Master-Slave 方式,其中 Master 节点负责读写,Slave 节点负责数据同步与消费。
值得注意的是,RocketMQ4.5 引入了多副本机制,RocketMQ 的副本机制与 kafka 的多副本两者之间的不同点是 RocketMQ 的副本维度是 Commitlog 文件,而 kafka 是主题分区级别。
聊完数据文件布局,我们再来看一下 Kafka、和 RocketMQ 的服务端是如何处理数据写入的。
Kafka 服务端处理消息写入的代码定义在 MemoryRecords 的 writeTo 方法中,具体代码截图如下(具体是调用入口 LogSegment 的 append 方法):
Kafka 服务端写入消息时,主要是调用 FileChannel 的 transferTo 方法,该方法底层使用了操作系统的 sendfile 系统调用。
而 RocketMQ 的消息写入支持内存映射与 FileChannel 两种写入方式,如下图所示
也就是说,如果将参数 tranisentStorePoolEnable 设置为 false,那就先将消息写入到页缓存,然后根据刷盘机制持久化到磁盘中。如果将参数设置为 true,数据会先写入到堆外内存,然后批量提交到 FileChannel,并最终根据刷盘策略将数据持久化到磁盘中。
值得注意的是,RocketMQ 与 Kafka 都支持通过 FileChannel 方式写入,但 RocketMQ 基于 FileChannel 写入时,调用的 API 并不是 transferTo,而是先调用 writer,然后定时 flush 刷写到磁盘,具体调用入口为 MappedFile。代码截图如下:
直接调用 FileChannel 的 transferTo 方法比 write 方法性能更优,因为 transferTo 底层使用了操作系统的 sendfile 系统调用,能充分发挥块设备的优势。
根据我的实践经验,sendfile 系统调用相比内存映射多了一个从用户缓存区拷贝到内核缓存区的步骤,但当内存写入超过 64K 时, sendfile 的性能往往更高,故 Kafka 在服务端的写入比 RocketMQ 会有更好的表现。
Kafka 消息发送客户端采用的是双端队列,还引入了批处理思想,它的消息发送机制如下图所示:
当客户端想要调用 Kafka 的消息发送者发送消息时,消息会首先存入到一个双端队列中,双端队列中单个元素为 ProducerBatch,表示一个发送批次,其最大值受参数 batch.size 控制,默认为 16K。然后,Kafka 客户端会单独开一个 Send 线程,从双端队列中获取发送批次,将消息按批发送到 Kafka 集群中。Kafka 还引入了 linger.ms 参数来控制 Send 线程的发送行为,代表批次要在双端队列中等待的最小时长。
如果将 linger.ms 设置为 0,表示立即发送消息;如果将参数设置为大于 0,那么发送线程在发送消息时只会从双端队列中获取等待时长大于该值的批次。 注意,linger.ms 参数会延长响应时间,但有利于增加吞吐量。有点类似于 TCP 领域的 Nagle 算法。
Kafka 的消息发送,在写入 ProducerBatch 时会按照消息存储协议组织数据,在服务端可以直接写入到文件中。
RocketMQ 的消息发送在客户端主要是根据路由选择算法选择一个队列,然后将消息发送到服务端。消息会在服务端按照消息的存储格式进行组织,然后进行持久化等操作。
刚才,我们从文件布局、服务端数据写入方式、客户端消息发送方式三个维度,对比了 Kafka 和 RocketMQ 各自在追求高性能时所采用的技术。综合对比来看,在同等硬件配置一下,Kafka 的综合性能要比 RocketMQ 更为强劲。
RocketMQ 和 Kafka 都使用了顺序写机制,但相比 Kafka,RocketMQ 在消息写入时追求极致的顺序写,会在同一时刻将消息全部写入一个文件,这显然无法压榨磁盘的性能。而 Kafka 是分区级别顺序写,在分区数量不多的情况下,从所有分区的视角来看是随机写,但这能重复发挥 CPU 的多核优势。因此,在磁盘没有遇到瓶颈时,Kafka 的性能要优于 RocketMQ。
同时,Kafka 在服务端写入时使用了 FileChannel 的 transferTo 方法,底层使用 sendfile 系统调用,比普通的 FileChannel 的 write 方法更有优势。结合压测效果来看,如果待写入的消息体大小超过 64K,使用 sendfile 的块写入方式甚至比内存映射拥有更好的性能。
在消息发送方面,Kafka 的客户端则充分利用了批处理思想,比 RocketMQ 拥有更高的吞吐率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。