赞
踩
读完本文,你将了解到如下知识点:
消费者1
的消费速度,那么就会导致数据堆积,产生一些大家都知道的蛋疼事情了,那么我们只能加强 消费者
的消费能力,所以也就有了我们下面来说的 消费者组
消费者组
,其实就是一组 消费者
的集合,当我们看到下面这张图是不是就特别舒服了,我们采用了一个 消费组
来消费这个 topic
,众人拾柴火焰高,其消费能力那是按倍数递增的,所以这里我们一般来说都是采用 消费者组
来消费数据,而不会是 单消费者
来消费数据的。这里值得我们注意的是:
topic
可以被 多个 消费者组
消费,但是每个 消费者组
消费的数据是 互不干扰 的,也就是说,每个 消费组
消费的都是 完整的数据 。消费者
消费,而 不能拆给多个消费者 消费图3
我们就很好的可以回答这个问题了,我们可以看到 消费者4
是完全没有消费任何的数据的,所以如果你想要加强 消费者组
的能力,除了添加消费者,分区的数量也是需要跟着增加的,只有这样他们的并行度才能上的去,消费能力才会强。为了提高 消费组 的 消费能力,我是不是可以随便添加 分区 和 消费者 呢?
答案当然是否定的啦。。。嘿嘿
我们看到图2
,一般来说我们建议 消费者
数量 和 分区
数量是一致的,当我们的消费能力不够时,就必须通过调整分区的数量来提高并行度,但是,我们应该尽量来避免这种情况发生,比如:
现在我们需要在图2
的基础上增加一个 分区4
,那么这个 分区4
该由谁来消费呢?这个时候kafka会进行 分区再均衡
,来为这个分区分配消费者,分区再均衡
期间该 消费组
是不可用的,并且作为一个 被消费者
,分区数的改动将影响到每一个消费者组
,所以在创建 topic
的时候,我们就应该考虑好分区数,来尽量避免这种情况发生
分区分配过程
上面我们提到了为 分区分配消费者,那么我们现在就来看看分配过程是怎么样的。
咱们以 java api
为例,下面是一个简单的 kafka consumer
- public static void main(String[] args) {
- //consumer 的配置属性
- Properties props = new Properties();
-
- ///brokers 地址
- props.put("bootstrap.servers", "localhost:9092");
-
- //指定该 consumer 将加入的消费组
- props.put("group.id", "test");
- // 开启自动提交 offset,关于offset提交,我们后续再来详细说说
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
-
- //指定序列化类
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- //创建 consumer
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
- //订阅消费主题,这里一个消费者可以同时消费 foo 和 bar 两个主题的数据
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
使用起来还是很简单的,不过如果想要用好 consumer,可能你还需要了解以下这些东西:
ok,那么我们接下来一个个来看吧。。。
- String topic = "foo";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0, partition1));
看起来只是把普通的订阅方式修改成了订阅知道 topic
的分区,其余的还是照常使用,不过这里也需要注意一下的是:
group id
是唯一的就可以了。topic
的分区变动不敏感,也就是说当 topic
新增了分区,分区的数据将会发生改变,但该消费组对此确是不感知的,依然照常运行,所以很多时候需要你手动consumer.partitionsFor()
去查看topic
的分区情况subscription
混合使用partition.assignment.strategy
进行分区策略配置Range
该方式最大的特点就是会将连续的分区分配给一个消费者,根据示例,我们可以得出如下结论:
ConsumerGroup 消费 TopicA 的时候:
ConsumerA 会分配到 A-1,A-2
ConsumerB 会分配到 A-3
ConsumerGroup 消费 TopicB 的时候:
ConsumerA 会分配到 B-1,B-2
ConsumerB 会分配到 B-3
所以:
ConsumerA 分配到了4个分区: A-1,A-2,B-1,B-2
ConsumerB 分配到了2个分区:A-3,B-3
RoundRobin
该方式最大的特点就是会以轮询的方式将分区分配给一个个消费者,根据示例,我们可以得出如下结论:
ConsumerGroup 消费 TopicA 的时候:
ConsumerA 分配到 A-1
ConsumerB 分配到 A-2
ConsumerA 分配到 A-3
ConsumerGroup 消费 TopicB 的时候,因为上次分配到了 ConsumerA,那么这次轮到 ConsumerB了 所以:
ConsumerB 分配到 B-1
ConsumerA 分配到 B-2
ConsumerB 分配到 B-3
所以:
ConsumerA 分配到了4个分区: A-1,A-3,B-2
ConsumerB 分配到了2个分区:A-2,B-1,B-3
从上面我们也是可以看出这两种策略的异同,RoundRobin 相比较 Range 会使得分区分配的更加的均衡,但如果是消费单个 topic ,那么其均衡是差不多的,而 Range 会比 RoundRobin 更具优势一点,至于这个优势,还得看你的具体业务了。
assign
的参数以及返回注释就 ok了- public class RangeAssignor extends AbstractPartitionAssignor{
- //省略部分代码。。。。
- /**
- * 根据订阅者 和 分区数量来进行分区
- * @param partitionsPerTopic: topic->分区数量
- * @param subscriptions: memberId 消费者id -> subscription 消费者信息
- * @return: memberId ->list<topic名称 和 分区序号(id)>
- */
- @Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
- //topic -> list<消费者>
- Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
- //初始化 返回结果
- Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<TopicPartition>());
-
- for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
- //topic
- String topic = topicEntry.getKey();
- // 消费该topic的 consumer-id
- List<String> consumersForTopic = topicEntry.getValue();
-
- //topic 的分区数量
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic == null)
- continue;
-
- Collections.sort(consumersForTopic);
-
- //平均每个消费者分配的 分区数量
- int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
- //平均之后剩下的 分区数
- int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
-
- //这里就是将连续分区切开然后分配给每个消费者
- List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
- for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
- int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
- int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
- assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
- }
- }
- return assignment;
- }
- }
首先,我们都应该知道,最全最全的文档应该是来自官网(虽然有时候可能官网找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯,以下内容来自 kafka权威指南 ,请原谅我的小懒惰。。。后续有时间会把工作中的遇到的补充上
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,
如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。
fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设为 100ms,并且fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在100ms 后返回所有可用的数据,就看哪个条件先得到满足。
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间。
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
它的默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
enable.auto.commit
我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
partition.assignment.strategy
(这部分好像重复了 ~~~)
我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。
Range
该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题T1 和 主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。可以通过设置 partition.assignment.strategy 来选择分区策略。
默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。
client.id
该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
这篇文章有点太长了,所以准备另起一篇来专门讲 offset 的控制。
预计在周末更新吧,如果你有兴趣,可以点击关注一下,以便及时收到提醒噢!!!弱弱的,也是求一波关注,哈哈哈!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。