赞
踩
Apache Kafka是一个开源流处理平台,由Scala和Java编写。它最初由LinkedIn公司开发,并于2011年初开源。Kafka是一个高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,Kafka是一个可行的解决方案。
Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。在这个平台上可以发布和订阅数据流,并将它们保存起来进行处理。Kafka可以存储和持续处理大型的数据流,并且具有流处理能力,可以高效地处理数据。
在Kafka中,消息用主题进行分类,主题下有若干个分区,有新消息会以追加的形式写入分区。由于主题会有多个分区,所以在整个主题范围内无法保证消息顺序。分区可以分布在不同的服务器上,实现数据冗余和伸缩。消费者可以订阅一个或多个主题,并且每次读取一条数据时,偏移量会增加1。此外,Kafka集群由多个Kafka实例组成,每个实例称为broker。无论是Kafka集群、producer还是consumer都依赖于Zookeeper集群来保存一些元信息,以保持系统的可用性。
Kafka是一个分布式的发布/订阅消息系统,使用Scala语言编写,最初由LinkedIn公司发布。它主要用于处理活跃的数据,如登录、浏览、点击、分享等用户行为产生的数据。
Kafka集群由多个Kafka服务节点组成,每个节点称为一个Broker。在Kafka集群中,没有“中心主节点”的概念,集群中的所有节点都是对等的。每个Broker就是一个Kafka服务实例,多个Broker构成一个Kafka集群。生产者发布的消息将保存在Broker中,消费者将从Broker中拉取消息进行消费。
在Kafka中,消息用主题进行分类,每个主题包含一个或多个分区。在创建主题时,需要指定包含的分区数据。分区可以提高负载,因为每个分区是不同的磁盘。此外,每个分区可以有多个副本,分布在不同的Broker上。Kafka会选出一个副本作为Leader,所有的读写请求都会通过Leader完成。当Leader宕机后,会从副本中选出一个新的Leader继续提供服务,实现故障自动转移。
Kafka集群是一个具有高吞吐量、高水平扩展性的分布式消息系统。通过将数据分区并分布在多个节点上,Kafka集群可以处理大规模的数据流并保证系统的可用性和可靠性。
Kafka集群具有以下特点:
高吞吐量、低延迟
:Kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。横向扩展
:Kafka集群支持热扩展,可以方便地增加或减少节点。分区与副本
:每个topic可以分多个partition,且每个partition都有多个副本,副本用于实现数据冗余和故障转移。持久性、可靠性
:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。容错性
:允许集群中节点失败,若副本数量为n,则允许n-1个节点失败。高并发
:支持数千个客户端同时读写。此外,Kafka还广泛应用于日志收集、消息系统、运营指标等场景。
Kafka集群搭建的步骤如下:
在搭建过程中,需要注意以下几点:
在Kafka集群中,故障切换主要依赖于Zookeeper组件的协调。Zookeeper是一个分布式协调服务,它可以监控Kafka集群中各个Broker(服务器节点)的状态。当Leader节点宕机时,Zookeeper会触发新的Leader选举。
在选举新Leader的过程中,Zookeeper会考虑各个Follower的同步状态,优先选择数据最新、最完整的Follower作为新的Leader。这样可以尽量保证数据的一致性,避免数据丢失。一旦新的Leader被选举出来,所有的读写请求就会被自动转发到新的Leader,对客户端来说,这个过程是透明的。
此外,Kafka通过多副本机制实现故障自动转移。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。当leader发生故障或挂掉时,一个新leader被选举并接收客户端的消息成功写入。
Kafka还允许将分区复制到多个Broker进行故障转移,以提高系统的可用性和容错性。通过以上机制,Kafka集群可以在发生故障时进行自动切换,保证服务的可用性和数据的可靠性。
Kafka集群中的Leader选举是维护集群稳定性和数据一致性的重要机制。当某个分区的Leader节点出现故障时,集群会从其他副本中选择一个新的Leader,以保证服务的连续性。
Kafka使用Zookeeper作为其分布式协调服务,每个主题的分区都有一个对应的Zookeeper路径,用于存储分区的元数据和ISR(In-Sync Replicas)集合。ISR集合是当前与Leader同步的副本集合,只有当副本加入ISR集合后,才会被选为新的Leader。
在Leader选举过程中,Kafka会根据副本的同步状态和在Zookeeper中的选举结果来选择新的Leader。如果ISR集合中的副本数量不足一半,Kafka会等待ISR中的任意一个副本恢复,并重新进行选举。如果ISR集合中的副本数量超过一半,Kafka会从ISR集合中随机选择一个副本作为新的Leader。
此外,Kafka还提供了unclean.leader.election.enable配置项,允许在ISR集合中没有合适的副本时选择非同步副本作为新的Leader。不过,这可能会导致数据的不一致性,因此需要谨慎使用。
Kafka集群的Leader选举机制通过Zookeeper的协调和ISR集合的选择,保证了集群的稳定性和数据的一致性。在故障发生时,能够快速地进行故障切换,保证服务的可用性。
Kafka集群的横向拓展可以通过增加Broker节点来实现。每个Broker节点是Kafka服务的一个实例,通过增加Broker节点,可以增加Kafka集群的处理能力和存储能力。
在增加Broker节点时,需要确保新的节点与现有的节点具有相同的配置,包括端口号、日志存储路径等。同时,需要将新的节点加入到Zookeeper集群中,以便进行协调管理。
在Kafka中,数据分区和副本是分布在不同的Broker节点上。因此,通过增加Broker节点,可以将更多的分区和副本分布到新的节点上,提高集群的处理能力和容错性。
需要注意的是,在增加Broker节点后,需要重新平衡集群中的分区和副本分布。Kafka提供了工具和命令来重新分配分区和副本,以确保集群的负载均衡和数据一致性。
总之,通过增加Broker节点,Kafka集群可以实现快速横向拓展,提高处理能力和存储能力。在增加节点时,需要注意配置一致性和负载均衡问题,以保证集群的稳定性和可靠性。
Kafka集群搭建的最佳实践包括以下几个方面:
总之,Kafka集群搭建的最佳实践需要综合考虑硬件配置、版本选择、配置优化、数据备份、安全性和可靠性、监控和维护、扩展性和灵活性等方面。在实际操作中,可以根据具体情况进行调整和优化,以达到最佳的效果。
Kafka集群可以使用单节点Zookeeper,但一般不推荐。因为Zookeeper是分布式协调服务,它主要负责维护和协调Kafka集群中的各个节点。在生产环境下,为了确保系统的可用性和可靠性,建议使用多节点Zookeeper集群。
Kafka集群的消费者信息保存在Kafka内部的topic中,这个topic被命名为__consumer_offsets。每个消费者都会在Kafka中保存其消费的进度,也就是offset,这些信息被存储在__consumer_offsets的partition中。在默认情况下,Kafka会为每个消费者组在__consumer_offsets中创建一个分区,并使用消费者的group ID、主题名称和分区编号作为key来存储对应的offset值。
Kafka集群的消费者信息保存在Kafka集群中的broker节点上,而不是保存在Zookeeper或其他外部系统中。这种设计是为了提高系统的可用性和可靠性。因为即使Kafka集群中的Zookeeper出现故障,消费者的消费进度也不会受到影响,因为它们都存储在Kafka的broker节点上。
此外,消费者可以通过Kafka提供的API来提交其消费进度。每次提交进度时,Kafka都会将新的offset值写入__consumer_offsets的相应partition中。消费者还可以通过查询__consumer_offsets来获取其消费进度,或者通过Kafka提供的命令行工具来查看和验证其消费进度。
Kafka集群的消费者信息保存在Kafka内部的__consumer_offsets topic中,这些信息存储在broker节点上,并通过API进行提交和查询。这种设计提高了系统的可用性和可靠性,使得即使Zookeeper出现故障,消费者的消费进度也不会受到影响。
Kafka集群的Topic分区数的设置规则需要考虑多个因素,包括业务需求、数据量、性能和可用性等。
首先,Topic的分区数量应大于或等于Broker的数量,最好是broker的数量乘以每一台机器上可用的核数,以提高吞吐率。如果一个Topic的分区数量过少,可能会导致性能瓶颈,因为每个分区只能由一个生产者写入,而多个消费者可以并行地从该生产者读取数据。
其次,在生产环境中,为了获得更高的吞吐量,可以基于目标吞吐量来设置分区数量。具体来说,可以通过计算单个生产者和消费者所能实现的所有性能来设置分区数量,公式为max(t/p, t/c),其中t是目标吞吐量,p是生产者性能,c是消费者性能。
另外,分区数的设置还需要考虑应用程序的需求。例如,如果应用程序需要处理大量数据,并且要求数据按照某个键进行排序和聚合,那么可以使用更多的分区来提高处理效率。如果应用程序需要更快的读取速度,可以使用较少的分区来减少消费者之间的竞争。
最后,在设置分区数时还需要注意一些限制和约束。例如,每个分区只能由一个生产者写入,而多个消费者可以并行地从该生产者读取数据。因此,如果一个Topic的分区数量过多,可能会导致生产者性能瓶颈。另外,分区数的设置还需要考虑到Kafka集群的可用性和容错性。例如,如果Kafka集群中的Broker节点出现故障,需要保证剩余的Broker节点能够继续提供服务。
Kafka集群的Topic分区数的设置规则需要根据业务需求、数据量、性能和可用性等因素综合考虑。在实践中,建议根据实际需求进行测试和调整,以找到最优的设置。
Kafka集群可以通过以下几种方式提高吞吐量:
通过多种方式综合优化可以提高Kafka集群的吞吐量。在实际应用中,建议根据实际情况进行测试和调整,以找到最优的设置和配置。
Kafka支持多种数据压缩类型,包括:
Kafka数据压缩的原理基于特定的压缩算法,将消息本身进行压缩并存储,待消费时再解压。压缩可以减少数据在传输过程中的大小,从而减轻网络传输的压力。在大数据处理场景中,瓶颈往往体现在网络传输上,而非CPU资源,因此数据压缩可以在一定程度上提高系统的整体性能。
Kafka支持以集合(batch)为单位发送消息,并在此基础上支持对消息集合进行压缩。Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。当Producer端进行压缩后,Consumer端需进行解压以还原原始数据。
在Kafka中,消息的头部会添加一个描述压缩属性的字节,该字节的后两位表示消息的压缩采用的编码。如果后两位为0,则表示消息未被压缩。
需要注意的是,虽然压缩可以减少传输的数据量,但压缩和解压过程会消耗CPU资源。因此,在选择是否启用压缩功能时,需要根据实际需求和性能要求进行权衡。
这些压缩类型都可以在Kafka的生产者和消费者中使用。生产者可以在配置中指定压缩类型,而消费者可以自动处理压缩数据。根据实际需求和性能要求选择合适的压缩类型可以提高Kafka的性能和吞吐量。
Kafka的数据压缩方式主要有两种:基于消息的压缩和基于日志的压缩。
基于消息的压缩方式,也称为端到端压缩,是在消息发送到Kafka时,对每条消息进行独立压缩,然后将压缩后的数据发送到Kafka。这种方式的优点在于,压缩和解压缩操作可以在消费者端进行,不需要Kafka支持。此外,由于每条消息都被独立压缩,因此可以充分利用压缩算法对不同消息进行差异化压缩,提高压缩率。但是,这种方式的缺点在于,如果每条消息大小差异很大,可能会导致压缩后的数据大小差异也很大,从而影响存储和网络传输效率。
基于日志的压缩方式,也称为日志压缩,是在整个Kafka日志层面进行压缩,而不是对每条消息进行压缩。这种方式的优点在于,由于是整个日志进行压缩,因此可以避免因消息大小差异大而导致的存储和传输效率问题。此外,由于Kafka本身支持压缩操作,因此可以减少消费者端的处理压力。但是,这种方式的缺点在于,如果日志量很大,可能会导致压缩和解压缩操作成为性能瓶颈。
Kafka的数据压缩方式各有优缺点,需要根据实际应用场景和需求进行选择。如果需要充分利用压缩算法对不同消息进行差异化压缩,且每条消息大小差异较大,可以选择基于消息的压缩方式;如果需要避免因消息大小差异导致的存储和传输效率问题,且日志量较大,可以选择基于日志的压缩方式。
要开启Kafka数据压缩,需要配置Kafka生产者或消费者来指定压缩算法。以下是一个简单的示例代码,展示如何在Kafka生产者中开启数据压缩:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // 设置Kafka生产者配置 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 开启数据压缩 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者实例 Producer<String, String> producer = new KafkaProducer<>(props); // 发送消息 String topic = "test-topic"; String key = "key"; String value = "value"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); // 关闭生产者实例 producer.close(); } }
在上述示例中,通过设置ProducerConfig.COMPRESSION_TYPE_CONFIG
属性为gzip
,开启了数据压缩功能。你可以根据需要选择不同的压缩算法,例如snappy
或lz4
等。同时,确保Kafka生产者和消费者版本兼容,并使用正确的压缩算法和依赖库。
在Kafka消费者中开启数据压缩,需要在消费者配置中指定压缩类型。以下是一个简单的示例代码,展示如何在Kafka消费者中开启数据压缩:
import org.apache.kafka.clients.consumer.*; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 设置Kafka消费者配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费 props.put(ConsumerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 开启数据压缩 // 创建Kafka消费者实例 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Arrays.asList("test-topic")); // 消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } }
在上述示例中,通过设置ConsumerConfig.COMPRESSION_TYPE_CONFIG
属性为gzip
,开启了数据压缩功能。你可以根据需要选择不同的压缩算法,例如snappy
或lz4
等。同时,确保Kafka生产者和消费者版本兼容,并使用正确的压缩算法和依赖库。
Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer、Broker、Consumer,以及依赖的Zookeeper集群。其中Zookeeper集群是Kafka用来负责集群元数据的管理、控制器的选举等。
然而,由于重度依赖Zookeeper集群,当Zookeeper集群性能发生抖动时,Kafka的性能也会收到很大的影响。因此,为了解决这个问题,Kafka引入了KRaft新内部功能,取消对Zookeeper的依赖。
在Kafka引入KRaft新内部功能后,对Zookeeper的依赖将会被取消。在KRaft中,一部分broker被指定为控制器,这些控制器提供过去由ZooKeeper提供的共识服务。这样做的好处有以下几点:
总的来说,通过KRaft和Zookeeper启动Kafka的区别在于,Zookeeper是Kafka用来负责集群元数据的管理和控制器选举的外部框架,而KRaft则是Kafka引入的新内部功能,旨在取消对Zookeeper的依赖,提高集群性能和可扩展性。
在Kafka 2.8之前,Kafka重度依赖于Zookeeper集群做元数据管理和集群的高可用(即所谓的共识服务)。在Kafka 2.8之后,引入了基于Raft协议的KRaft模式,支持取消对Zookeeper的依赖。在此模式下,一部分Kafka Broker被指定为Controller,另一部分则为Broker。这些Controller的作用就是以前由Zookeeper提供的共识服务,并且所有的元数据都将存储在Kafka主题中并在内部进行管理。
总体而言,使用KRaft的好处如下:Kafka不用再依赖外部框架,能够做到独立运行。类似于Redis的Sentinel,它的本质仍然是一个Kafka实例。Controller管理集群时,不再需要从Zookeeper中先读取数据,因此集群的性能得到一定的提升。由于不再依赖Zookeeper,Kafka集群扩展时不用再受到Zookeeper读写能力的限制。Controller不再动态选举,而是由配置文件规定。这样可以有针对性的加强Controller节点的配置,而不是像以前一样对随机Controller节点的高负载束手无策。
因此,Kafka集群可以不依赖Zookeeper。
在Kafka集群中,Zookeeper扮演了关键的角色,主要体现在以下几个方面:
总的来说,Zookeeper在Kafka集群中扮演了元数据存储、Broker管理、Leader选举以及集群配置管理等关键角色,是确保Kafka集群正常运行和高可用性的重要组成部分。
Kafka中的Controller是Kafka集群的核心组件,用于管理和协调整个Kafka集群。以下是Controller的主要作用:
Kafka中的Controller作用主要是管理和协调整个Kafka集群,包括选举Leader和ISR、监控Broker和Topic的变化、维护主题以及分区重分配等。
KafkaTemplate是Apache Kafka提供的一种用于简化生产者(Producer)和消费者(Consumer)开发过程的配置抽象。它是一个在Spring Kafka中使用的模板类,封装了KafkaProducer,使得发送消息到Kafka集群的操作更为简便。
KafkaTemplate的主要特点包括:
在使用KafkaTemplate时,你可以将其视为一个执行高级操作的模板。当与DefaultKafkaProducerFactory一起使用时,这个模板是线程安全的。KafkaTemplate将消息发送到Kafka集群中的一个主题,并负责处理发送时可能出现的异常。
总的来说,KafkaTemplate是Kafka在Spring Boot中的一个重要组件,它使得开发者能够更方便、更高效地利用Kafka进行消息的生产和消费。
KafkaTemplate作为Spring Kafka中的一个核心组件,用于简化生产者(Producer)和消费者(Consumer)的开发过程,具有许多优点和一些潜在的缺点。
KafkaTemplate具有许多优点,使得开发者能够更方便、更高效地利用Kafka进行消息的生产和消费。然而,它也有一些潜在的缺点,需要在使用时注意和考虑。
在日常开发中,我们最常使用的是它的用于消息发送的send方法,这是一个异步的非阻塞的方法,我们可以选择通过 ListenableFuture<SendResult<K, V>> 来获取它的发送结果,如下图所示:
这个消息发送的过程本身是异步来进行的,所以是一个非阻塞的过程,但是在这个过程中,实际上是存在着一个阻塞的行为,这个就是在发送过程中调用的 doSend 方法中的获取元数据的方法 waitOnMetadata ,如下图所示:
当程序无法正常获取到元数据时,会不断的循环进行尝试获取,直到超时为止,如果出现Kafka节点挂掉或者网络出现异常的情况,就可能出现消息发送阻塞的情况,我们在使用的时候需要特别注意该问题的发生并提前进行相应的处理和预防
具体的元数据更新机制可通过下面的博文来进行深入了解: https://blog.csdn.net/u013277209/article/details/123411790
将消息发送行为放在线程池中进行,使上述的阻塞行为发生在线程池中,而不是主线流程中,使用该方法需要对线程池队列的消息和淘汰策略进行提前设置,防止消息堆积过多造成的内存溢出或者消息淘汰的问题
通过配置参数缩短阻塞的超时时间,获取原信息的最大阻塞时间是由 max.block.ms
这个配置来决定的,该配置默认60s,是在 ProducerConfig 中配置的,可以通过修改改配置来缩短阻塞的时间,但是缩短该配置可能会导致无法正常获取到元数据信息导致的消息发送失败,酌情考虑配置
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。