当前位置:   article > 正文

Kafka学习之消费者_partitioninfo is null

partitioninfo is null

Kafka学习之消费者

前言

本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点。

内容

1、消费者与消费组

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者(也就是说订阅了同一个主题的消费组会收到同样的消息,但是同一消费组里面的消费者获取的是不同的消息,消费组的概念类似于消息分发器)。

如下图所示,该主题存在四个分区(Partition, 对应四个日志文件):P0、P1、P2、P3。有两个消费组A和B都订阅了这个主题,消费组A中有4个消费者(c0, c1,c2,c3),消费组B中有2个消费者(C4和C5)。按照Kafka默认的规则,最后的分配结果是消费组A中的每一个消费者分配到1个分区,消费组B中的每一个消费者分配到2个分区,两个消费组之间互不影响。每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费

在这里插入图片描述

再看一下,消费组内的消费者个数变化时所对应的分区分配的演变。假设目前某消费组内只有一个消费者 C0,订阅了一个主题,这个主题包含 7 个分区:P0、P1、P2、P3、P4、P5、P6。也就是说,这个消费者C0订阅了7个分区。

此时消费组内又加入了一个新的消费者C1,按照既定的逻辑,需要将原来消费者C0的部分分区分配给消费者C1消费(有可能是p4,p5,p6分配给C1)。

紧接着消费组内又加入了一个新的消费者C2,消费者C0、C1和C2的消费分区又会发生变化(c0->0,1,2;c1->3,4;c3->5,6)。

消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。

以上分配逻辑都是基于默认的分区分配策略进行分析的,可以通过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。

partition.assignment.strategy的默认参数为org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,还提供了RoundRobinAssignor 和 StickyAssignor。另外该参数还可以配置多个分配策略,彼此之间以逗号分隔。

【RangeAssignor】 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
对于每一个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

但是这种分配策略在某些情况是不合理的,假设上面例子中2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者c0: t0p0、t0p1、t1p0、t1p1
消费者c1: t0p2、t1p2
可以看出消费者c0的监听分区明显多于消费者c1;如果将类似的情形扩大,则有可能出现部分消费者过载的情况。

【RoundRobinAssignor】分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。
如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor分配策略的分区分配会是均匀的。举个例子,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者c0: t0p0、t0p2、t1p1
消费者c1: t0p1、t1p0、t1p2
这样看来明显是均匀的。
如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。举个例子,假设消费组内有3个消费者(C0、C1和C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

消费者c0: t0p0
消费者c1: t1p0
消费者c2: t1p0、t2p0、t2p1、t2p2
可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。
【StickyAssignor】
Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
(1)分区的分配要尽可能均匀。
(2)分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标;
假设消费组内有3个消费者(C0、C1和C2),它们都订阅了4个主题(t0、t1、t2、t3),并且每个主题有2个分区。也就是说,整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:
消费者c0: t0p0、t1p1、t3p0
消费者c1: t0p1、t2p0、t3p1
消费者c2: t1p0、t2p1

这样初看上去似乎与采用RoundRobinAssignor分配策略所分配的结果相同,但事实是否真的如此呢?再假设此时消费者 C1脱离了消费组,那么消费组就会执行再均衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor分配策略,那么此时的分配结果如下:
消费者c0: t0p0、t1p0、t2p0、t3p0
消费者c2: t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor分配策略会按照消费者C0和C2进行重新轮询分配。如果此时使用的是StickyAssignor分配策略,那么分配结果为:
消费者c0: t0p0、t1p1、t3p0、t2p0
消费者c2: t1p0、t2p1、t0p1、t3p1
可以看到分配结果中保留了上一次分配中对消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
如果发生分区重分配,那么对于同一个分区而言,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor 分配策略如同其名称中的“sticky”一样,让分配策略具备一定的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。

2、消费者客户端开发

一个正常的消费逻辑需要具备以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例。

(2)订阅主题。

(3)拉取消息并消费。

(4)提交消费位移。

(5)关闭消费者实例。

代码示例如下:

public class KafkaConsumerAnalysis {

    public static final String brokerList = "localhost:9092";
    public static final String topic = "ztTest";
    public static final String groupId = "group.zt";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers",brokerList);
        properties.put("group.id",groupId);
        properties.put("client.id","consumer.client.id.demo");
        return  properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));
        try{
            while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ",partition = " + record.partition() + ", offset = " + record.offset());
                    System.out.println("key = " + record.key() + ", value = " + record.value());
                    // do service logic
                }

            }
        }catch (Exception e){
            //log
        }finally {
            consumer.close();
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
2.1 必要参数配置
  • bootstrap.servers: 该参数的释义和生产者客户端KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需的 broker 地 址 清 单,具 体 内 容 形 式 为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。
我们一般可以简单地认为 bootstrap.servers 这个参数所要指定的就是将要连接的Kafka集群的broker地址列表。不过从深层次的意义上来讲,这个参数配置的是用来发现Kafka集群元数据信息的服务地址。
  • 1
  • group.id: 消费者隶属的消费组的名称,默认值为“”。则会报出异常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
  • key.deserializer 和 value.deserializer:与生产者客户端KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。
2.2 订阅主题和分区

在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,代码清单3-1中我们使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载方法如下:

在这里插入图片描述

对于消费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准

subscribe 的重载方法中有一个参数类型是 ConsumerRebalance-Listener,这个是用来设置相应的再均衡监听器的。

消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能,此方法的具体定义如下:

public void assign(Collection<TopicPartition> partitions) {
        acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
            } else if (partitions.isEmpty()) {
                this.unsubscribe();
            } else {
                for (TopicPartition tp : partitions) {
                    String topic = (tp != null) ? tp.topic() : null;
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                }
                fetcher.clearBufferedDataForUnassignedPartitions(partitions);
                if (coordinator != null)
                    this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());

                log.info("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
                    metadata.requestUpdateForNewTopics();
            }
        } finally {
            release();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这个方法只接受一个参数partitions,用来指定需要订阅的分区集合。

//TopicPartition类,在kafka的客户端中,用来表示分区,源代码如下:
public final class TopicPartition implements Serializable {
    private static final long serialVersionUID = -613627415771699627L;

    private int hash = 0;
    private final int partition;
    private final String topic;

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

    public int partition() {
        return partition;
    }

    public String topic() {
        return topic;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题—分区的概念映射起来。

所以我们使用的时候api代码如下:

consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));
  • 1

这里有个问题,如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息,partitionsFor()方法的具体定义如下:

public List<PartitionInfo> partitionsFor(String topic)
  • 1

其中PartitionInfo类型即为主题的分区元数据信息,此类的主要结构如下:

public class PartitionInfo {
    //主题名称
    private final String topic;
    //分区号
    private final int partition;
    //分区leader副本所在的位置 
    private final Node leader;
    //代表该分区的AR集合
    private final Node[] replicas;
    //代表分区的ISR集合
    private final Node[] inSyncReplicas;
    //代表分区的OSR集合
    private final Node[] offlineReplicas;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

通过partitionFor()方法的协助,我们可以通过assign()方法来实现订阅主题(全部分区)的功能,示例参考如下:

 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic-demo");
        if (partitionInfos!=null) {
            for (PartitionInfo partitionInfo : partitionInfos) {
                topicPartitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
            }
        }
        consumer.assign(topicPartitions);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer中的 **unsubscribe()**方法来取消主题的订阅。这个方法既可以取消通过 subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。

集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了**三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED(如果没有订阅,那么订阅状态为NONE)。**然而这三种状态是互斥的,在一个消费者中只能使用其中的一种。

通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

2.3 反序列化

Kafka所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short 及String类型的反序列化,这些序列化器也都实现了 Deserializer 接口,与KafkaProducer中提及的Serializer接口一样,Deserializer接口也有三个方法。

  • public void configure(Map<String,?> configs,boolean isKey):用来配置当前类。
  • public byte[] serialize(String topic,T data):用来执行反序列化。如果data为null,那么处理的时候直接返回null而不是抛出一个异常。
  • public void close():用来关闭当前序列化器。

我们来看一下StringDeserializer具体代码实现:

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

在实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf或Protostuff等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。使用通用的序列化工具也需要实现Serializer 和 Deserializer 接口,因为Kafka客户端的序列化和反序列化入口必须是这两个类型。

2.4 消息消费

Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式(微博、微信)和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复调用poll() 方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为;如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合

poll()方法的具体定义如下:

 public ConsumerRecords<K, V> poll(final Duration timeout) {
        return poll(time.timer(timeout), true);
    }
  • 1
  • 2
  • 3

注意到poll()方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间(while循环实现的),在消费者的缓冲区里没有可用数据时会发生阻塞。

timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。

消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富,具体的结构参考如下代码:

public class ConsumerRecord<K, V> {
    //主题
    private final String topic;
    //分区号
    private final int partition;
    //消息所在分区的偏移量
    private final long offset;
    //timestamp 表示时间戳
    private final long timestamp;
    //表示时间戳类型,分为两种:CrateTime(消息创建的时间) 和 LogAppendTime(消息追加到日志的时间)
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    //消息的头部信息
    private final Headers headers;
    //消息的key
    private final K key;
    //消息的value
    private final V value;
    private final Optional<Integer> leaderEpoch;
	//checksum是CRC32的校验值。
    private volatile Long checksum;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

我们在消费消息的时候可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。poll()方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息,iterator()方法的定义如下:

public Iterator<ConsumerRecord<K, V>> iterator() {
        return new ConcatenatedIterable<>(records.values()).iterator();
    }
  • 1
  • 2
  • 3

除此之外,我们还可以按照分区维度来进行消费,这一点很有用,在手动提交位移时尤为明显,有关位移提交的内容我们会在下一节中详细陈述。ConsumerRecords类提供了一个**records(TopicPartition)**方法来获取消息集中指定分区的消息,此方法的定义如下:

public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
        if (recs == null)
            return Collections.emptyList();
        else
            return Collections.unmodifiableList(recs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

我们不妨使用这个records(TopicPartition)方法来修改一下之前的代码消费逻辑,主要的示例代码如下:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                //遍历消息集合中的主题分区
                for (TopicPartition partition : records.partitions()) {
                    //获取每个分区中的消息
                    for (ConsumerRecord<String, String> record : records.records(partition)) {
                        System.out.println(record.partition() + ":" + record.value());
                    }
                }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

上面示例中的 ConsumerRecords.partitions()方法用来获取消息集中所有分区。在 ConsumerRecords类中还提供了按照主题维度来进行消费的方法,这个方法是records(TopicPartition)的重载方法,具体定义如下:

public Iterable<ConsumerRecord<K, V>> records(String topic) {
        if (topic == null)
            throw new IllegalArgumentException("Topic must be non-null.");
        List<List<ConsumerRecord<K, V>>> recs = new ArrayList<>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
            if (entry.getKey().topic().equals(topic))
                recs.add(entry.getValue());
        }
        return new ConcatenatedIterable<>(recs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

ConsumerRecords 类中并没提供与 partitions()类似的topics()方法来查看拉取的消息集中所包含的主题列表,**如果要按照主题维度来进行消费,那么只能根据消费者订阅主题时的列表来进行逻辑处理了。**下面的示例演示了如何使用ConsumerRecords中的record(String topic)方法:

List<String> topicList = Arrays.asList("topic1","topic2");
        consumer.subscribe(topicList);
        try{
            while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (String s : topicList) {
                    for (ConsumerRecord<String, String> record : records.records(s)) {
                        //todo logic service
                    }
                }
            }
        }finally {
            consumer.close();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在 ConsumerRecords 类中还提供了几个方法来方便开发人员对消息集进行处理:count()方法用来计算出消息集中的消息个数,返回类型是int;isEmpty()方法用来判断消息集是否为空,返回类型是boolean;empty()方法用来获取一个空的消息集,返回类型是ConsumerRecord<K,V>。

到目前为止,可以简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移消费者协调器组协调器消费者的选举分区分配的分发再均衡的逻辑心跳等内容。

2.5 位移提交

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词“offset”可以翻译为“偏移量”,也可以翻译为“位移”,读者可能并没有过多地在意这一点:在很多中文资料中都会交叉使用“偏移量”和“位移”这两个词,并没有很严谨地进行区分。笔者对offset做了一些区分:对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。做这一区分的目的是让读者在遇到 offset的时候可以很容易甄别出是在讲分区存储层面的内容,还是在讲消费层面的内容,如此也可以使“偏移量”和“位移”这两个中文词汇具备更加丰富的意义。当然,对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的,在某些不需要具体划分的场景下也可以用“消息位置”或直接用“offset”这个单词来进行表述。

在每次调用poll()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。

再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为**“提交”**,消费者在消费完消息之后需要执行消费位移的提交。

如下图所示:

x表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,图中也用了lastConsumedOffset这个单词来标识它。

在这里插入图片描述

不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+1,对应于上图中的position,它表示下一条需要拉取的消息的位置。在消费者中还有一个committed offset的概念,它表示已经提交过的消费位移。

KafkaConsumer 类提供了 position(TopicPartition)和committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。这两个方法的定义如下所示。

 public long position(TopicPartition partition) {
        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
    }

public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
        return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。

在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

在Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?

如下图所示:

在这里插入图片描述

拉取线程A不断地拉取消息并存入本地缓存,比如在BlockingQueue中,另一个处理线程B从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第y+1次拉取,以及第m次位移提交的时候,也就是x+6之前的位移已经确认提交了,处理线程B却还正在消费x+3的消息。此时如果处理线程B发生了异常,待其恢复之后会从第m此位移提交处,也就是x+6的位置开始拉取消息,那么x+3至x+6之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false;

手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式,commitSync()方法的定义如下:

public void commitSync()
  • 1

demo代码如下:

while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (String s : topicList) {
                    for (ConsumerRecord<String, String> record : records.records(s)) {
                        //todo logic service
                    }
                }
                consumer.commitSync();
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以看到示例中先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。

参考 KafkaConsumer 源码中提供的示例,针对上面的示例还可以修改为批量处理+批量提交的方式,关键代码如下:

while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (String s : topicList) {
                    for (ConsumerRecord<String, String> record : records.records(s)) {
                        buffer.add(record);
                    }
                }
                if (buffer.size() >= minBatchSize) {
                    //do some logical processing with buffer
                    consumer.commitSync();
                    buffer.clear();
                }

            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

上面的示例中将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是示例中大于等于200个的时候,再做相应的批量处理,之后再做批量提交。这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩溃,那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

commitSync()方法会根据poll()方法拉取的最新位移来进行提交(注意提交的值对应于图3-6中position的位置),只要没有发生不可恢复的错误(Unrecoverable Error),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException等,我们可以将其捕获并做针对性的处理。对于采用 commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,具体定义如下:

public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
    }
  • 1
  • 2
  • 3

该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync()方法只能提交当前批次对应的 position值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式,我们来看一下代码示例:

while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    // do some logical processing
                    //获取该条消息的位移
                    long offset = record.offset();
                    //提交的位移是当前的位置+1
                    long commitOffset = offset +1;
                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(commitOffset)));
                }
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在实际应用中,很少会有这种每消费一条消息就提交一次消费位移的必要场景。commitSync()方法本身是同步执行的,会耗费一定的性能,而示例中的这种提交方式会将性能拉到一个相当低的点。更多时候是按照分区的粒度划分提交位移的界限,这里我们就要用到2.4中提及的 ConsumerRecords类的 partitions()方法和 records(TopicPartition)方法,关键示例代码如下所示。

try{
            while (isRunging()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                Set<TopicPartition> partitions = records.partitions();
                for (TopicPartition partition : partitions) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
                        //做消费逻辑处理,按照分区逻辑
                    }
                    long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    long commitOffset = offset + 1;
                    consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(commitOffset)));
                }
            }
        }finally {
            consumer.close();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

与commitSync()方法相反,异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

public void commitAsync()public void commitAsync(OffsetCommitCallback callback)public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
  • 1
  • 2
  • 3

关键的是这里的第二个方法和第三个方法中的callback参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的onComplete()方法。

2.6 控制和关闭消费

KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。这两个方法的具体定义如下:

public void pause(Collection<TopicPartition> partitions)public void resume(Collection<TopicPartition> partitions)
  • 1
  • 2

KafkaConsumer还提供了一个无参的paused()方法来返回被暂停的分区集合,此方法的具体定义如下:

public Set<TopicPartition> paused()
  • 1

之前的示例展示的都是使用一个while循环来包裹住poll()方法及相应的消费逻辑,如何优雅地退出这个循环也很有考究。细心的读者可能注意到有些示例代码并不是以 while(true)的形式做简单的包裹,而是使用 while(isRunning.get())的方式,这样可以通过在其他地方设定isRunning.set(false)来退出while循环。还有一种方式是调用KafkaConsumer的wakeup()方法,wakeup()方法是 KafkaConsumer 中唯一可以从其他线程里安全调用的方法(KafkaConsumer 是非线程安全的),调用wakeup()方法后可以退出poll()的逻辑,并抛出WakeupException 的异常,我们也不需要处理WakeupException 的异常,它只是一种跳出循环的方式。

跳出循环以后一定要显式地执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源、Socket连接等。KafkaConsumer提供了close()方法来实现关闭,close()方法有三种重载方法,分别如下:

public void close();
public void close(Duration timeout);
public void close(long timeout, TimeUnit timeUnit);
  • 1
  • 2
  • 3

第二种方法是通过 timeout 参数来设定关闭方法的最长执行时间,有些内部的关闭逻辑会耗费一定的时间,比如设置了自动提交消费位移,这里还会做一次位移提交的动作;而第一种方法没有 timeout 参数,这并不意味着会无限制地等待,它内部设定了最长等待时间(30秒);第三种方法已被标记为@Deprecated,可以不考虑。

那么一个相对完整的消费程序的逻辑可以参考下面的伪代码:

	List<String> topicList = Arrays.asList("topic1","topic2");
        consumer.subscribe(topicList);
        try{
            while (getRuning()) {
               //consumer.poll();
                // process the record
                // commit offset
                //特定情况,关闭消费
                if(xx){
                    //setRuningisFalse
                    // cosumer.wakeup()
                }
            }
        }catch (WakeupException e){
            //忽略该异常
        }catch (Exception e){
            // 其他异常做逻辑处理
        }
        finally {
            consumer.close();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

当关闭这个消费逻辑的时候,可以调用consumer.wakeup(),也可以调用isRunning.set(false)。

2.7 指定位移消费

在2.5节中我们讲述了如何进行消费位移的提交,正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。

试想一下,当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。

参考下图:

在这里插入图片描述

按照默认的配置,消费者会从9开始进行消费(9是下一条要写入消息的位置),更加确切地说是从9开始拉取消息。如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费。

除了查找不到消费位移,位移越界也会触发 auto.offset.reset参数的执行,这个在下面要讲述的seek系列的方法中会有相关的介绍。

auto.offset.reset参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。

如果能够找到消费位移,那么配置为“none”不会出现任何异常。如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常。

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,这个poll()方法中的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 **seek()**方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:

public void seek(TopicPartition partition, long offset)
  • 1

seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。**seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。**也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。seek()方法的使用示例如下所示(只列出关键代码)。

consumer.poll(Duration.ofMillis(1000));1//获取消费者分配的分区
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition topicPartition : assignment) {
    //重置该分区的位移
consumer.seek(topicPartition,10);
}
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(1000);
// 消费消息
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

上述程序有一处可以优化的地方就是(1),这里我们用1s的时间去执行poll方法,其实poll方法给消费者分配分区有时候很快,导致了不必要的时间浪费,我们可以优化成下面的代码:

List<String> topicList = Arrays.asList("topic1","topic2");
        consumer.subscribe(topicList);
        try{
            Set<TopicPartition> assignment = new HashSet<>();
            //根据分配分区的数量为flag判断是否分配完成
            while (assignment.size() == 0) {
                //缩短执行时间
                consumer.poll(Duration.ofMillis(100));
                assignment = consumer.assignment();
            }
            for (TopicPartition topicPartition : assignment) {
                consumer.seek(topicPartition,10);
            }
            while (true) {
                ConsumerRecords<String, String> poll = consumer.poll(1000);
                // 消费消息
            }  
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

如果对未分配到的分区执行seek()方法,那么会报出IllegalStateException的异常。

**endOffsets()**方法用来获取指定分区的末尾的消息位置。我们可以配合seek()方法,来实现从某个分区末尾消息位置来拉取消息的功能,endOffsets的具体方法定义如下:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)
  • 1
  • 2

其中partitions参数表示分区集合,而timeout参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么endOffsets()方法的等待时间由客户端参数request.timeout.ms 来设置,默认值为 30000。

与endOffsets 对应的是 **beginningOffsets()**方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。beginningOffsets()方法的具体定义如下:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)
  • 1
  • 2

beginningOffsets()方法中的参数内容和含义都与endOffsets()方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning()方法和seekToEnd()方法来实现这两个功能,这两个方法的具体定义如下:

public void seekToBeginning(Collection<TopicPartition> partitions)//内部通过上面提到的reset offset,设置策略为EARLIEST
public void seekToEnd(Collection<TopicPartition> partitions);
//内部通过上面提到的reset offset,设置策略为LATEST
  • 1
  • 2
  • 3

有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek()方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes()方法,通过timestamp来查询与此对应的分区位置。

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
  • 1
  • 2

offsetsForTimes()方法的参数timestampsToSearch是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段。示例代码如下:

List<String> topicList = Arrays.asList("topic1","topic2");
consumer.subscribe(topicList);
try{
    Map<TopicPartition,Long> timestampToSearch = new HashMap<>();
    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignment = consumer.assignment();
    }
    for (TopicPartition topicPartition : assignment) {
        timestampToSearch.put(topicPartition,System.currentTimeMillis() - 1*24*3600*1000);
    }
    Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);
    for (TopicPartition topicPartition : assignment) {
        OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
        if (offsetAndTimestamp !=null) {
            consumer.seek(topicPartition,offsetAndTimestamp.offset());
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.5节中提及了Kafka中的消费位移是存储在一个内部主题中的,而本节的seek()方法可以突破这一限制:消费位移可以保存在任意的存储介质中,例如数据库文件系统(kafka主题也就是文件其实)等。以数据库为例,我们将消费位移保存在其中的一个表中,在下次消费的时候可以读取存储在数据表中的消费位移并通过seek()方法指向这个具体的位置,伪代码如下所示:

List<String> topicList = Arrays.asList("topic1","topic2");
consumer.subscribe(topicList);
try{
    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignment = consumer.assignment();
    }
    for (TopicPartition topicPartition : assignment) {
        long offset = getOffsetFromDb(topicPartition);
        consumer.seek(topicPartition,offset);
    }
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> records1 = records.records(partition);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : records1) {
                //处理消息
            }
            long offset = records1.get(records1.size() - 1).offset();
            storeOffsetToDb(partition,offset+1);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

2.8 在均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。

不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。

2.2 节中在讲述 subscribe()方法时提及再均衡监听器ConsumerRebalanceListener,在subscribe(Collection<String> topics,ConsumerRebalanceListener listener) 和subscribe(Pattern pattern,ConsumerRebalanceListenerlistener)方法中都有它的身影。再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作。ConsumerRebalanceListener 是一个接口,包含2 个方法,具体的释义如下:

void onPartitionsRevoked(Collection<TopicPartition> partitions);
  • 1

这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。参数partitions表示再均衡前所分配到的分区。

void onPartitionsAssigned(Collection<TopicPartition> partitions);
  • 1

这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区。

下面我们通过一个例子来演示ConsumerRebalanceListener的用法,具体内容如下面代码示:

Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
        consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(currentOffsets);
                currentOffsets.clear();
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

            }
        });
        try{
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));
                }
                consumer.commitAsync(currentOffsets,null);
            }
        }finally {
            consumer.close();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

将消费位移暂存到一个局部变量currentOffsets中,这样在正常消费的时候可以通过commitAsync()方法来异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的onPartitionsRevoked()回调执行commitSync()方法同步提交消费位移,以尽量避免一些不必要的重复消费。

再均衡监听器还可以配合外部存储使用。在上面代码清单中,我们将消费位移保存在数据库中,这里可以通过再均衡监听器查找分配到的分区的消费位移,并且配合 seek()方法来进一步优化代码逻辑,将代码上面清单中的第一行代码修改为如下内容:

consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            storeOffsetInDb(partition);
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            consumer.seek(partition,getOffsetFromDb(partition));
        }
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
2.9 消费者拦截器

消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。

与生产者拦截器对应的,消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。ConsumerInterceptor接口包含3个方法:

public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
  • 1
  • 2
  • 3

KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递

KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。

在某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效,它也就不需要再被继续处理了。下面使用消费者拦截器来实现一个简单的消息TTL(Time to Live,即过期时间)的功能。

在下面代码清单中,自定义的消费者拦截器ConsumerInterceptorTTL使用消息的 timestamp 字段来判定是否过期,如果消息的时间戳与当前的时间戳相差超过10秒则判定为过期,那么这条消息也就被过滤而不投递给具体的消费者。

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
    public static final long EXPIRE_INTERVAL = 10 *100;

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long l = System.currentTimeMillis();
        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
        while (iterator.hasNext()){
            ConsumerRecord<String, String> next = iterator.next();
            long timestamp = next.timestamp();
            if (l-timestamp>=EXPIRE_INTERVAL) {
                iterator.remove();
            }
        }
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach(((topicPartition, offsetAndMetadata) -> System.out.println(topicPartition + ":"
                + offsetAndMetadata.offset())));
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

实现自定义的ConsumerInterceptorTTL之后,需要在KafkaConsumer中配置指定这个拦截器,这个指定的配置和KafkaProducer中的一样,也是通过interceptor.classes参数实现的,此参数的默认值为“”。示例如下:

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
  • 1

不过使用这种功能时需要注意的是:在使用带参数的位移提交的方式时,有可能提交了错误的位移信息。在一次消息拉取的批次中,可能含有最大偏移量的消息会被消费者拦截器过滤。

在消费者中也有拦截链的概念,和生产者的拦截链一样,也是按照interceptor.classes参数配置的拦截器的顺序来一一执行的(配置的时候,各个拦截器之间使用逗号隔开)。同样也要提防“副作用”的发生。如果在拦截链中某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

2.10 多线程实现

KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常:

//线程id存储器
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);

private void acquire() {
    //获取线程id
        long threadId = Thread.currentThread().getId();
    //如果当前线程id 不等于类里面存储的id 并且 不是第一个线程进入就会报错
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个acquire()方法,只有wakeup()方法是个例外。

acquire()方法和我们通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待,我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。release()方法也很简单,具体定义如下:

 // refcount is used to allow reentrant access by the thread who has acquired currentThread
//提供同一个线程的重入保障,如果一个方法中调用另一个方法,这时候就会在第二个方法的时候关掉这个线程,导致在第一个方法release的时候,抛出异常
    private final AtomicInteger refcount = new AtomicInteger(0);
private void release() {
    if (refcount.decrementAndGet() == 0)
        currentThread.set(NO_CURRENT_THREAD);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

acquire()方法和release()方法都是私有方法,因此在实际应用中不需要我们显式地调用,但了解其内部的机理之后可以促使我们正确、有效地编写相应的程序逻辑。KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。 如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象,如下图所示:

在这里插入图片描述

一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,

根据 2.2 节中介绍的消费者与分区数的关系,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。

与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,并不推荐。

一般而言,分区是消费线程的最小划分单位。下面我们通过实际编码来演示第一种多线程消费实现的方式,详细示例参考如下代码清单所示。

public class FirstMultiConsumerThreadDemo {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic -demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig(){
        Properties properties = new Properties();
        //ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put("bootstrap.servers",brokerList);
        properties.put("group.id",groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        return  properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        int consumerThreadNum = 4;
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(properties,topic).start();
        }
    }

    private static class KafkaConsumerThread extends Thread{

        private KafkaConsumer<String,String> kafkaConsumer;

        public KafkaConsumerThread(Properties properties, String topic) {
            this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        //处理消息逻辑(1)
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

内部类KafkaConsumerThread代表消费线程,其内部包裹着一个独立的KafkaConsumer实例。通过外部类的main()方法来启动多个消费线程,消费线程的数量由consumerThreadNum变量指定。一般一个主题的分区数事先可以知晓,可以将consumerThreadNum设置成不大于分区数的值,如果不知道主题的分区数,那么也可以通过KafkaConsumer类的partitionsFor()方法来间接获取,进而再设置合理的consumerThreadNum值。

上面这种多线程的实现方式和开启多个消费进程的方式没有本质上的区别,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的TCP连接,如果分区数和consumerThreadNum的值都很大,那么会造成不小的系统开销。

参考上面代码清单 中的第(1)行,如果这里对消息的处理非常迅速,那么 poll()拉取的频次也会更高,进而整体消费的性能也会提升;相反,如果在这里对消息的处理缓慢,比如进行一个事务性操作,或者等待一个RPC的同步响应,那么poll()拉取的频次也会随之下降,进而造成整体消费性能的下降。一般而言,poll()拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,如果我们通过一定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。参考下图,考虑第三种实现方式,将处理消息模块改成多线程的实现方式,具体实现如代码清单如下所示。

在这里插入图片描述

public class ThirdMultiConsumerThreadDemo {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic -demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig(){
        Properties properties = new Properties();
        //ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put("bootstrap.servers",brokerList);
        properties.put("group.id",groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        return  properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaConsumerThread consumer = new KafkaConsumerThread(properties,topic,Runtime.getRuntime().availableProcessors());
         consumer.start();
    }

    private static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String,String> kafkaConsumer;
        private ExecutorService executorService;
        private int threadNum;
        public KafkaConsumerThread(Properties properties,String topic, int threadNum){
            this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
            this.threadNum = threadNum;
            executorService = new ThreadPoolExecutor(threadNum,threadNum,0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        executorService.submit(new RecordHandler(records));
                    }1}
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }

        private class RecordHandler implements Runnable {

            public final ConsumerRecords<String,String> records;

            public RecordHandler(ConsumerRecords<String, String> records) {
                this.records = records;
            }

            @Override
            public void run() {
                //处理消息逻辑
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

上述代码清单中的RecordHandler类是用来处理消息的,而KafkaConsumerThread类对应的是一个消费线程,里面通过线程池的方式来调用 RecordHandler 处理一批批的消息。注意KafkaConsumerThread类中ThreadPoolExecutor里的最后一个参数设置的是CallerRunsPolicy(),这样可以防止线程池的总体消费能力跟不上poll()拉取的能力,从而导致异常现象的发生。第三种实现方式还可以横向扩展,通过开启多个KafkaConsumerThread 实例来进一步提升整体的消费能力。

第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。在代码清单中的initConfig()方法里笔者特意加了一个配置:

 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
  • 1

这样旨在说明在具体实现的时候并没有考虑位移提交的情况。对于第一种实现方式而言,如果要做具体的位移提交,它的具体实现和 2.5 节讲述的位移提交没有什么区别,直接在KafkaConsumerThread 中的 run()方法里实现即可。而对于第三种实现方式,这里引入一个共享变量offsets来参与提交,如下图所示:
在这里插入图片描述

每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量offsets中,KafkaConsumerThread在每一次poll()方法之后都读取offsets中的内容并对其进行位移提交。注意在实现的过程中对offsets读写需要加锁处理,防止出现并发问题。并且在写入offsets的时候需要注意位移覆盖的问题,针对这个问题,可以将RecordHandler类中的run()方法实现改为如下内容:

for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> records = this.records.records(partition);
    long offset = records.get(records.size() - 1).offset();
    //锁住全局记录的offset
    synchronized (offsets) {
        if (!offsets.containsKey(partition)) {
            offsets.put(partition,new OffsetAndMetadata(offset + 1));
        }else {
            long offset1 = offsets.get(partition).offset();
            //防止位移覆盖,如果历史offset 大于 当前消息最长的offset + 1 ,就覆盖,负责忽略
            if(offset1 < offset +1){
                offsets.put(partition,new OffsetAndMetadata(offset+1));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

对应的位移提交实现可以添加在代码KafkaConsumerThread类的第(1)行代码下方,具体实现参考如下:

synchronized (offsets) {
    if (!offsets.isEmpty()) {
        kafkaConsumer.commitSync(offsets);
        offsets.clear();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

但是这种处理方式是存在一定风险的。对于同一个分区中的消息,假设一个处理线程RecordHandler1正在处理offset为0~99的消息,而另一个处理线程RecordHandler2已经处理完了offset为100~199的消息并进行了位移提交,此时如果RecordHandler1发生异常,则之后的消费只能从200开始而无法再次消费0~99的消息,从而造成了消息丢失的现象。这里虽然针对位移覆盖做了一定的处理,但还没有解决异常情况下的位移覆盖问题。

对此就要引入更加复杂的处理机制,这里再提供一种解决思路,参考下图,总体结构上是基于滑动窗口实现的。对于第三种实现方式而言,它所呈现的结构是通过消费者拉取分批次的消息,然后提交给多线程进行处理,而这里的滑动窗口式的实现方式是将拉取到的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小,总体上而言没有太多的变化,不同的是对于消费位移的把控。

在这里插入图片描述

如上图所示, 每一个方格代表一个批次的消息,一个滑动窗口包含若干方格,startOffset标注的是当前滑动窗口的起始位置,endOffset标注的是末尾位置。每当startOffset指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来startOffset所指方格中对应的消息,并且拉取新的消息进入窗口。滑动窗口的大小固定,所对应的用来暂存消息的缓存大小也就固定了,这部分内存开销可控。

方格大小和滑动窗口的大小同时决定了消费线程的并发数:一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高;对于方格大小固定的情况,窗口越大并行度越高。

不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常(比如Crash)的情况下也会引起大量的重复消费,同时还考虑线程切换的开销,建议根据实际情况设置一个合理的值,不管是对于方格还是窗口而言,过大或过小都不合适。

如果一个方格内的消息无法被标记为消费完成,那么就会造成startOffset 的悬停。为了使窗口能够继续向前滑动,那么就需要设定一个阈值,当 startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列

死信队列:由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。
理解死信队列,关键是要理解死信。死信可以看作消费者不能处理收到的消息,也可以看作消费者不想处理收到的消息,还可以看作不符合处理要求的消息。比如消息内包含的消息内容无法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就可以看作消费者不能处理的消息。再比如超过既定的重试次数之后将消息投入死信队列,这里就可以将死信看作不符合处理要求的消息。

重试队列其实可以看作一种回退队列,具体指消费端消费消息失败时,为了防止消息无故丢失而重新将消息回滚到broker中。与回退队列不同的是,重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列Q1,Q1的重新投递延时为5s,5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延时为10s,10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此还需要设置一个上限,超过投递次数就进入死信队列。重试队列与延时队列有相同的地方,都需要设置延时级别。它们的区别是:延时队列动作由内部触发,重试队列动作由外部消费端触发;延时队列作用一次,而重试队列的作用范围会向后传递。
  • 1
  • 2
  • 3
  • 4

真实应用中无法消费的情况极少,一般是由业务代码的处理逻辑引起的,比如消息中的内容格式与业务处理的内容格式不符,无法对这条消息进行决断,这种情况可以通过优化代码逻辑或采取丢弃策略来避免。如果需要消息高度可靠,也可以将无法进行业务逻辑的消息(这类消息可以称为死信)存入磁盘、数据库或Kafka,然后继续消费下一条消息以保证整体消费进度合理推进,之后可以通过一个额外的处理任务来分析死信进而找出异常的原因。

2.11 重要的消费者参数

有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。下面挑选一些重要的参数来做细致的讲解。

1.fetch.min.bytes

该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

2.fetch.max.bytes

该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是 50MB。如果这个参数设置的值比任何一条写入Kafka中的消息要小,那么会不会造成无法消费呢?很多资料对此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数设定的值是一次拉取请求中所能拉取的最大数据量,那么显然 1B<10B,所以无法拉取。这个观点是错误的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。与此相关的,Kafka中所能接收的最大消息的大小通过服务端参数message.max.bytes(对应于主题端参数max.message.bytes)来设置。

3.fetch.max.wait.ms

这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

4.max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

5.max.poll.records

这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度

6.connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟

7.exclude.internal.topics

Kafka中有两个内部的主题:__consumer_offsets和__transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。

8.receive.buffer.bytes

这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。

9.send.buffer.bytes

这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

10.request.timeout.ms

这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)。

11.metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。

12.reconnect.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向broker发送的所有请求

13.retry.backoff.ms

这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。

14.isolation.level

这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(HighWatermark)处的位置。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号