赞
踩
生产者 本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略:
Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果Key相同的话,那么就会分配到统一分区。
生产者发送消息时整个分区路由的步骤如下:
判断消息中的partition字段是否有值,有值的话即指定了分区,直接将该消息发送到指定的分区就行。
如果没有指定分区,则使用分区器进行分区路由,首先判断消息中是否指定了key。
如果指定了key,则使用该key进行hash操作,并转为正数,然后对topic对应的分区数量进行取模操作并返回一个分区。
如果没有指定key,则通过先产生随机数,之后在该数上自增的方式产生一个数,并转为正数之后进行取余操作。
消费者
Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略
demo见
https://www.cnblogs.com/superfj/p/9440835.html
1.若没有分区,一个topic对应的消息集在分布式集群服务组中,就会分布不均匀,即可能导致某台服务器A记录当前topic的消息集很多,若此topic的消息压力很大的情况下,服务器A就可能导致压力很大,吞吐也容易导致瓶颈。
有了分区后,假设一个topic可能分为10个分区,kafka内部会根据一定的算法把10分区尽可能均匀分布到不同的服务器上,比如:A服务器负责topic的分区1,B服务器负责topic的分区2,在此情况下,Producer发消息时若没指定发送到哪个分区的时候,kafka就会根据一定算法上个消息可能分区1,下个消息可能在分区2。当然高级API也能自己实现其分发算法。
1.kafka为什么要在topic里加入分区的概念?
topic是逻辑的概念,partition是物理的概念,对用户来说是透明的。producer只需要关心消息发往哪个topic,而consumer只关心自己订阅哪个topic,并不关心每条消息存于整个集群的哪个broker。
为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。Partition的引入就是解决水平扩展问题的一个方案。
如同我在Kafka设计解析(一)里所讲,每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展
2.如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗
“segment消息写满后”,consume消费数据并不需要等到segment写满,只要有一条数据被commit,就可以立马被消费。
segment对应一个文件(实现上对应2个文件,一个数据文件,一个索引文件),一个partition对应一个文件夹,一个partition里理论上可以包含任意多个segment。所以partition可以认为是在segment上做了一层包装。这个问题换个角度问可能更好,
2.1 “为什么有了partition还需要segment”。
如果不引入segment,一个partition直接对应一个文件(应该说两个文件,一个数据文件,一个索引文件),那这个文件会一直增大。同时,在做data purge时,需要把文件的前面部分给删除,不符合kafka对文件的顺序写优化设计方案。引入segment后,每次做data purge,只需要把旧的segment整个文件删除即可,保证了每个segment的顺序写,
Consumer Group:每个 consumer 属于一个特定的 consumer group(若不指定 group name 则属于默认的 group)。一个 topic可以有多个CG,topic的消息会分发到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer。如果所有的consumer都具有相同的group, 即单播,消息将会在consumers之间负载均衡;如果所有的consumer都具有不同的group,那这就是"发布-订阅",每条消息将会广播给所有的consumer。
https://blog.csdn.net/qq_40378034/article/details/90549488
七、深入客户端
1、分区分配策略
Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略
1)、RangeAssignor分配策略
RangeAssignor分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区
假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区
假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3
假设2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2
1)、消息传输保障
消息中间件的消息传输保障有3个层级:
at most once:至多一次。消息可能会丢失,但绝对不会重复传输
at least once:最少一次。消息绝不会丢失,但可能会重复传输
exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
2)、幂等
幂等是指对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况
开启幂等性功能需要显示地将生产者客户端参数enable.idempotence设置为true即可(这个参数默认值为false)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
1
如果要确保幂等性功能正常,需要确保生产者客户端的retries、acks、max.in.flight.requests.per.connection这几个参数不被配置错。在使用幂等性功能的时候,用户完全不需要配置这几个参数
如果用户显示指定了retries参数,那么这个参数的值必须大于0,如果没有显示指定retries参数,那么KafkaProducer会将它置为Integer.MAX_VALUE。同时还要保证max.in.flight.requests.per.connection(限制每个连接最多缓存的请求数)参数的值不能大于5,acks参数的值为-1
为了实现生产者的幂等性,Kafka为此引入了producer id(PID)和序列号这两个概念。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将<PID,分区>对应的序列号的值加1
broker端会在内存中为每一对<PID,分区>维护一个序列号。对于收到的每一条消息,只有当它的序列号的值比broker端中维护的对应的序列号的值大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_new<SN_old+1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new>SN_old+1,那么说明中间有数据尚未写入,出现了乱序,可能有消息丢失,对应的生产者会抛出OutOfOrderSequenceException
消费者的角度分析,Kafka并不能保证已提交的事务中的所有消息都能够被消费:
对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)
事务中消息可能分布在同一个分区的多个日志分段中,当老的日志分段被删除时,对应的消息可能会丢失
消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息
消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息
KafkaProducer提供了5个与事务相关的方法
- //初始化事务
- void initTransactions()
- //开启事务
- void beginTransaction() throws ProducerFencedException
- //消费者在事务内的位移提交
- void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException
- //提交事务
- void commitTransaction() throws ProducerFencedException
- //中止事务
- void abortTransaction() throws ProducerFencedException
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- producer.initTransactions();
- producer.beginTransaction();
- try {
- ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
- producer.send(record1);
- ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
- producer.send(record2);
- producer.commitTransaction();
- } catch (ProducerFencedException e) {
- producer.abortTransaction();
- } finally {
- producer.close();
- }

kafka使用分区将topic的消息打散到多个分区分布保存在不同的broker上,实现了producer和consumer消息处理的高吞吐量。Kafka的producer和consumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费。
所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
一、客户端/服务器端需要使用的内存就越多 Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。
二、文件句柄的开销 每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
三、降低高可用性 Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。
可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions:
- def partition(key: Any, numPartitions: Int): Int = {
- Utils.abs(key.hashCode) % numPartitions
- }
这保证了相同key的消息一定会被路由到相同的分区。
如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?
- if(key == null) { // 如果没有指定key
- val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id
- id match {
- case Some(partitionId) =>
- partitionId // 如果有的话直接使用这个分区Id就好了
- case None => // 如果没有的话,
- val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker
- if (availablePartitions.isEmpty)
- throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
- val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个
- val partitionId = availablePartitions(index).partitionId
- sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
- partitionId
- }
- }
不指定key时,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)。
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。
所以,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。
Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。
当以下事件发生时,Kafka 将会进行一次分区分配:
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。
下面我们将详细介绍 Kafka 内置的两种分区分配策略。本文假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)
来消费这10个分区里面的数据,而且 C1 的 num.streams = 1,C2 的 num.streams = 2。
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。
使用RoundRobin策略有两个前提条件必须满足:
所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,看下面的代码应该会明白:
- val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
- info("Consumer %s rebalancing the following partitions for topic %s: %s"
- .format(ctx.consumerId, topic, partitions))
- partitions.map(partition => {
- TopicAndPartition(topic, partition)
- })
- }.toSeq.sortWith((topicPartition1, topicPartition2) => {
- /*
- * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
- * up on one consumer (if it has a high enough stream count).
- */
- topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
- })
最后按照round-robin风格将分区分别分配给不同的消费者线程。
在这个的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
多个主题的分区分配和单个主题类似。遗憾的是,目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。