当前位置:   article > 正文

Kafka知识总结之生产者简单使用_kafka recordmetadata

kafka recordmetadata

一. 测试环境搭建

引入依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

创建一个Bean,这里的配置后面会解析。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @author kirin.麒麟
 * @version 1.0.0
 * @classname Kafka
 * @desc Kafka配置
 * @date 2022/1/9 5:04 下午
 */
@Slf4j
@Configuration
public class KafkaConfig {

    /**
     * 生产者Bean
     * @return
     */
    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
        // 消息确认机制配置
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        // 多长时间发送一个批次
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        // 缓冲
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(properties);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

发生消息:

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer<String, String> kafkaProducer;

    @GetMapping("/send")
    public String sendMsg() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "这是第" + i + "条消息");
            kafkaProducer.send(record);
            Thread.sleep(200);
        }
        return "success";
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

开启主题的监听:

# 单节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9092 --topic test-topic --from-beginning
# 集群节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9091,192.168.31.138:9092,192.168.31.138:9093 --topic test-topic --from-beginning

  • 1
  • 2
  • 3
  • 4
  • 5

二. 消息发送模式

从第一部分可以知道,发送消息就是将消息和Topic封装一个ProducerRecord对象,然后通过KafkaProducersend方法发送。
在这里插入图片描述

这里有两种种方法,这些方法都是通过Future封装返回的,是可以拿到返回值的,其实都是异步执行的,第二个可以执行异步回调。
在这里插入图片描述

异步发送:

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record);
  • 1
  • 2

同步发送:

因为这里使用Future做返回,所以可以通过get()方法阻塞,相当于同步。

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get();
  • 1
  • 2
  • 3

异步回调发送:

回调就是在callback中执行业务

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record, (metadata, exception) -> {
    // TODO
});
  • 1
  • 2
  • 3
  • 4

三. 生产者构建流程

这里简单看一下KafkaProducer的初始化过程,从构造方法开始:
在这里插入图片描述

接着获取Client.id,若没有配置则由是producer-递增的数字

this.producerConfig = config;
this.time = time;
// 获取事物ID
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 获取用户配置的client.id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

设置生产者监控:

this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
  • 1

设置分区器paritioner,这里可以设置自己的分区器,需要增加:partitioner.class,将自定义的分区器路径写入。
在这里插入图片描述

设置key和value的序列化器:
在这里插入图片描述
解析并实例化拦截器,这里可以自定义实现拦截器,过滤特定的消息。
在这里插入图片描述
构建RecordAccumulator,这个类是用来存放消息
在这里插入图片描述
RecordAccumulator这个类里面看一下属性batches

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
  • 1

它是一个 ConcurrentMapkeyTopicPartition 类,代表一个 topic 的一个 partitionvalue 是一个包含 ProducerBatch双端队列。等待 Sender 线程发送给 broker

创建守护线程sender,用来监听发送消息:
在这里插入图片描述

KafkaThread的可以看出这个线程是守护线程:
在这里插入图片描述

这里需要注意,KafkaProducer的主线程是用来往RecordAccumulator里面写消息,Sender守护线程是用来读取消息并发送到Kafka中的。

四. 消息发送大体流程

上面的例子演示了消息的发送,这里简单看一下消息发送的大体流程:
在这里插入图片描述
先从send函数开始分析
在这里插入图片描述
首先方法会先进入拦截器集合ProducerInterceptorsonSend方法是遍历拦截器onSend方法,拦截器的目的是将数据处理加工,Kafka本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口ProducerInterceptor并实现onSend方法。
在这里插入图片描述
接下来看一下doSend方法,首先先判断守护线程sender是否可用,接着判断要发送到topicmetadata是否可用:
在这里插入图片描述
序列化keyvalue
在这里插入图片描述
计算record对应的partition的值,这里如果定义了分区器会使用自定义,也可以指定算法获取该值。下面会介绍具体分区器实现

int partition = partition(record, serializedKey, serializedValue, cluster);
  • 1

接着像accumulator中追加数据:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

  • 1
  • 2
  • 3

如果是新批次需要重新指定分区在追加数据:
在这里插入图片描述
缓冲满了之后,唤醒sender线程发送消息:
在这里插入图片描述
到这儿只是消息发送大体的流程,内部还有一大堆的代码,功力不够看着有点上头,等有时间在看吧!

五. 序列化器

消息在网络上传输,必须进行序列化转化为字节流。Kafka提供了默认的字符串序列化器StringSerializer,除此之外还有ByteArrayByteBufferBytesDoubleIntegerLong等。

org.apache.kafka.common.serialization包下面可以看到默认实现的序列化器!

接下来看一下借助fastjson实现自定义序列化器:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserInfo {
    private String name;
    private Integer age;
}

// 序列化器
public class CustomizeSerializer implements Serializer<UserInfo> {
    @Override
    public byte[] serialize(String topic, UserInfo data) {
        return JSON.toJSONBytes(data);
    }
}

// 反序列化器
public class CustomizeDeserializer implements Deserializer<UserInfo> {
    @Override
    public UserInfo deserialize(String topic, byte[] data) {
        return JSON.parseObject(data, UserInfo.class);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

配置使用:

// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomizeSerializer.class);
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomizeDeserializer.class);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

使用:

@Slf4j
@RestController
public class CustomizeController {

    @Autowired
    private KafkaProducer<String, UserInfo> kafkaProducer;

    private static final String TOPIC_NAME = "long-topic";

    @GetMapping("c_send")
    public String send(@RequestParam(value = "size", defaultValue = "20") Integer size) {
        for (int i = 0; i < size; i++) {
            ProducerRecord<String, UserInfo> record = new ProducerRecord<>(TOPIC_NAME, 0, "user:" + i, new UserInfo("这是第" + i + "条消息", i));
            kafkaProducer.send(record, (metadata, exception) -> {
                log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
            });
        }
        return "success";
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这里插入图片描述

六. 拦截器

拦截器的使用场景:

  • 按照某个规则过滤掉不符合要求的消息
  • 修改消息内容
  • 发送消相关统计

自定义拦截器,主要是实现ProducerInterceptor接口,这个接口中四个方法:

  • onSend:该方法封装进KafkaProducer.send方法中,即它运行在用户的主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
  • onAcknowledgement:该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。注意:onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率
  • close:关闭interceptor,主要用于执行一些资源清理的工作
  • configure:获取配置信息和初始化数据时调用

实例:

public class CustomizeProducer implements ProducerInterceptor<String, UserInfo> {

    @Override
    public ProducerRecord<String, UserInfo> onSend(ProducerRecord<String, UserInfo> record) {
        UserInfo value = record.value();
        return value.getAge() % 2 == 0 ? new ProducerRecord<>(
                record.topic(),
                record.key(),
                new UserInfo(value.getName(), value.getAge() + 100)) :
                new ProducerRecord<>(
                        record.topic(),
                        record.key(),
                        new UserInfo(value.getName(), value.getAge() + 60));
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
    
    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

使用自定义拦截器:
在这里插入图片描述
注意:ProducerConfig.INTERCEPTOR_CLASSES_CONFIG对应的值是一个拦截器数组。

七. 分区器

7.1. 测试集群分区器

发送消息结果数据实体RecordMetadata,返回数据包好分区器、偏移量和主题。
在这里插入图片描述

测试发动100条消息数据:

@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "100") Integer size) throws InterruptedException {
    for (int i = 0; i < size; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "这是第" + i + "条消息");
        kafkaProducer.send(record, (metadata, exception) -> {
            log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
        });
    }
    return "success";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

如果没有指定分区器系统会使用默认的默认的分区器:DefaultPartitioner(在ProducerConfig中有定义)。该分区器首先会随机选择一个分区,并尽可能一直使用该分区,直到该分区批次(默认是16k)已满或者已完成(linger.ms配置的时间已到),Kafka会再随机选择一个和上一个分区不同的分区进行使用。
在这里插入图片描述
关于计算分区器的值有下面这几种情况:

  • 指明partition的情况下,直接将指明的值作为partition的值

    @GetMapping("/send")
    public String sendMsg(@RequestParam(value = "size", defaultValue = "20") Integer size) throws InterruptedException {
        for (int i = 0; i < size; i++) {
            // 设置0分区发送数据
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "key:" + i, "这是第" + i + "条消息");
            kafkaProducer.send(record, (metadata, exception) -> {
                log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
            });
        }
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 没有指明partition值但有key的情况下,将keyhash值与topicpartition数进行取余得到partition的值

    Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    
    • 1
  • partitionkey都不存在的情况下,通过 stickyPartitionCachepartition 方法计算出分区。

7.1. 默认分区器

系统里面支持如下几种:
在这里插入图片描述

  • DefaultPartitioner:默认分区策略,如果key也不存在,则会对可用分区进行轮询,如果没有指定分区,且存在key值,则会根据keyhash进行取模来选择分区。(需要注意这里和以前的版本实现不一样)

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里看看一下StickyPartitionCache,这里面的具体实现:

    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }
    
    public int partition(String topic, Cluster cluster) {
        Integer part = indexCache.get(topic);
        if (part == null) {
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    indexCache 是一个 ConcurrentHashMap 对象,对应的是 Topic -> Partition 的映射,如果该值不存在则调用 nextPartition 方法选择一个分区并缓存。

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // 但是分区是空或者与prevPartition相同
        if (oldPart == null || oldPart == prevPartition) {
            // 获取主题对应partition
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            // partition数量小于1
            if (availablePartitions.size() < 1) {
                // 获取一个线程安全的随机数
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                // 取余返回新的partition
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                // partition数量等于1,返回对应的parition
                newPart = availablePartitions.get(0).partition();
            } else {
                // partition数量大于1
                while (newPart == null || newPart.equals(oldPart)) {
                    // 生成的新partition不为null,并且与当前的partion不相同
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // 如果当前的partition是null,将partition缓冲更新
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                // 否则替换旧的partition
                indexCache.replace(topic, prevPartition, newPart);
            }
            // 最后返回主题对应的partition
            return indexCache.get(topic);
        }
        // 最后返回主题对应的partition
        return indexCache.get(topic);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    这个过程其实比较简单的,这里其实也是粘性分区策略实现方式

  • UniformStickyPartitioner:粘性分区策略
    参看DefaultPartitioner

  • RoundRobinPartitioner:轮询策略

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取主题对应的partition
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();  // 数量
        int nextValue = nextValue(topic);       // 主题对应的新值
        // 获取主题对应的可用的partition
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            // 计算新的partition值
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // 没有可用的分区,给一个不可用的分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }
    
    // 将主题对应的值+1
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

7.2. 自定义分区器

自定义CustomizePartition类,并且实现Partitioner接口,重写partitionclose方法

public class CustomizePartition implements Partitioner {
    /**
     * Compute the partition for the given record.
     *
     * @param topic      主题名
     * @param key        key值,不存在则是null
     * @param keyBytes   序列化的key,不存在则是null
     * @param value      value值,不存在则是null
     * @param valueBytes 系列化的value,不存在则是null
     * @param cluster    集群信息
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    /**
     * This is called when partitioner is closed.
     */
    @Override
    public void close() {}

    /**
     * Configure this class with the given key-value pairs
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

使用的时候只需要在配置文件中指定一下就可以:
在这里插入图片描述

八. 生产者核心配置参数

  • acks:发送应答,默认值是1。
  • batch.size:批量发送大小,默认是16384(16k)
  • bootstrap.servers:服务器地址,多个服务器地址用逗号分割开
  • buffer.memory:生产者最大可用的缓冲,默认是33554432(32M)
  • client.id:生产者ID,默认是“”
  • compression.type:压缩类型,默认是producer(未压缩),还有其他的配置类型:gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztd
  • retries:失败重试次数,默认整型的最大值(2147483647)
  • retry.backoff.ms:重试阻塞时间
  • delivery.timeout.ms:传输时间
  • connections.max.idle.ms:关闭空闲连接时间,默认是540000
  • enable.idempotence:开启幂等,默认是false,
  • max.in.flight.request.per.connection:单个连接上发送的未确认请求的最大连接数,默认是5
  • interceptor.classes:拦截器,默认是无拦截器
  • key.seriailzer:key的序列化器,默认是无
  • value.seriailzer:value的序列化器,默认是无
  • linger.ms:发送延迟时间,默认是0
  • max.block.ms:阻塞时间,默认是一分钟(60000)
  • max.request.size:最大请求字节大小,默认是1M(1048576)
  • metric.reporters:自定义指标报告器
  • partitioner.class:自定义分区器
  • request.timeout.bytes:请求超时时间,默认30000
  • receive.buffer.bytes:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小,默认值32k(32768)。如果值是-1,将使用OS默认值
  • send.buffer.bytes:发送数据时使用的TCP发送缓冲去(SO_SEDBUF)的大小,默认是是128k(131072)。如果值是-1,将使用OS的默认值
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/142436?site
推荐阅读
相关标签
  

闽ICP备14008679号