赞
踩
引入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
创建一个Bean,这里的配置后面会解析。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Properties; /** * @author kirin.麒麟 * @version 1.0.0 * @classname Kafka * @desc Kafka配置 * @date 2022/1/9 5:04 下午 */ @Slf4j @Configuration public class KafkaConfig { /** * 生产者Bean * @return */ @Bean public KafkaProducer<String, String> kafkaProducer() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092"); // 消息确认机制配置 properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 重试 properties.put(ProducerConfig.RETRIES_CONFIG, "0"); // 批次大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // 多长时间发送一个批次 properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); // 缓冲 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); // 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new KafkaProducer<>(properties); } }
发生消息:
@RestController public class KafkaController { @Autowired private KafkaProducer<String, String> kafkaProducer; @GetMapping("/send") public String sendMsg() throws InterruptedException { for (int i = 0; i < 100; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "这是第" + i + "条消息"); kafkaProducer.send(record); Thread.sleep(200); } return "success"; } }
开启主题的监听:
# 单节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9092 --topic test-topic --from-beginning
# 集群节点监听
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.138:9091,192.168.31.138:9092,192.168.31.138:9093 --topic test-topic --from-beginning
从第一部分可以知道,发送消息就是将消息和Topic封装一个ProducerRecord
对象,然后通过KafkaProducer
的send
方法发送。
这里有两种种方法,这些方法都是通过Future
封装返回的,是可以拿到返回值的,其实都是异步执行的,第二个可以执行异步回调。
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record);
因为这里使用Future
做返回,所以可以通过get()
方法阻塞,相当于同步。
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata metadata = future.get();
回调就是在callback
中执行业务
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "hello world");
kafkaProducer.send(record, (metadata, exception) -> {
// TODO
});
这里简单看一下KafkaProducer
的初始化过程,从构造方法开始:
接着获取Client.id
,若没有配置则由是producer
-递增的数字
this.producerConfig = config;
this.time = time;
// 获取事物ID
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 获取用户配置的client.id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
设置生产者监控:
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
设置分区器paritioner
,这里可以设置自己的分区器,需要增加:partitioner.class
,将自定义的分区器路径写入。
设置key和value的序列化器:
解析并实例化拦截器,这里可以自定义实现拦截器,过滤特定的消息。
构建RecordAccumulator
,这个类是用来存放消息
到RecordAccumulator
这个类里面看一下属性batches
:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
它是一个 ConcurrentMap
,key
是 TopicPartition
类,代表一个 topic
的一个 partition
。value
是一个包含 ProducerBatch
的双端队列。等待 Sender
线程发送给 broker
。
创建守护线程sender
,用来监听发送消息:
在KafkaThread
的可以看出这个线程是守护线程:
这里需要注意,KafkaProducer
的主线程是用来往RecordAccumulator
里面写消息,Sender
守护线程是用来读取消息并发送到Kafka
中的。
上面的例子演示了消息的发送,这里简单看一下消息发送的大体流程:
先从send
函数开始分析
首先方法会先进入拦截器集合ProducerInterceptors
,onSend
方法是遍历拦截器onSend
方法,拦截器的目的是将数据处理加工,Kafka
本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口ProducerInterceptor
并实现onSend
方法。
接下来看一下doSend
方法,首先先判断守护线程sender
是否可用,接着判断要发送到topic
的metadata
是否可用:
序列化key
和value
计算record
对应的partition
的值,这里如果定义了分区器会使用自定义,也可以指定算法获取该值。下面会介绍具体分区器实现
int partition = partition(record, serializedKey, serializedValue, cluster);
接着像accumulator
中追加数据:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
如果是新批次需要重新指定分区在追加数据:
缓冲满了之后,唤醒sender线程发送消息:
到这儿只是消息发送大体的流程,内部还有一大堆的代码,功力不够看着有点上头,等有时间在看吧!
消息在网络上传输,必须进行序列化转化为字节流。Kafka提供了默认的字符串序列化器StringSerializer
,除此之外还有ByteArray
、ByteBuffer
、Bytes
、Double
、Integer
、Long
等。
在org.apache.kafka.common.serialization
包下面可以看到默认实现的序列化器!
接下来看一下借助fastjson
实现自定义序列化器:
@Data @AllArgsConstructor @NoArgsConstructor public class UserInfo { private String name; private Integer age; } // 序列化器 public class CustomizeSerializer implements Serializer<UserInfo> { @Override public byte[] serialize(String topic, UserInfo data) { return JSON.toJSONBytes(data); } } // 反序列化器 public class CustomizeDeserializer implements Deserializer<UserInfo> { @Override public UserInfo deserialize(String topic, byte[] data) { return JSON.parseObject(data, UserInfo.class); } }
配置使用:
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomizeSerializer.class);
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomizeDeserializer.class);
使用:
@Slf4j @RestController public class CustomizeController { @Autowired private KafkaProducer<String, UserInfo> kafkaProducer; private static final String TOPIC_NAME = "long-topic"; @GetMapping("c_send") public String send(@RequestParam(value = "size", defaultValue = "20") Integer size) { for (int i = 0; i < size; i++) { ProducerRecord<String, UserInfo> record = new ProducerRecord<>(TOPIC_NAME, 0, "user:" + i, new UserInfo("这是第" + i + "条消息", i)); kafkaProducer.send(record, (metadata, exception) -> { log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset()); }); } return "success"; } }
拦截器的使用场景:
自定义拦截器,主要是实现ProducerInterceptor
接口,这个接口中四个方法:
onSend
:该方法封装进KafkaProducer.send
方法中,即它运行在用户的主线程中。Producer
确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic
和分区,否则会影响目标分区的计算onAcknowledgement
:该方法会在消息从RecordAccumulator
成功发送到Kafka Broker
之后,或者在发送过程中失败时调用。并且通常都是在producer
回调逻辑触发之前。注意:onAcknowledgement
运行在producer
的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer
的消息发送效率close
:关闭interceptor,主要用于执行一些资源清理的工作configure
:获取配置信息和初始化数据时调用实例:
public class CustomizeProducer implements ProducerInterceptor<String, UserInfo> { @Override public ProducerRecord<String, UserInfo> onSend(ProducerRecord<String, UserInfo> record) { UserInfo value = record.value(); return value.getAge() % 2 == 0 ? new ProducerRecord<>( record.topic(), record.key(), new UserInfo(value.getName(), value.getAge() + 100)) : new ProducerRecord<>( record.topic(), record.key(), new UserInfo(value.getName(), value.getAge() + 60)); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {} @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
使用自定义拦截器:
注意:ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
对应的值是一个拦截器数组。
发送消息结果数据实体RecordMetadata
,返回数据包好分区器、偏移量和主题。
测试发动100条消息数据:
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "100") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
如果没有指定分区器系统会使用默认的默认的分区器:DefaultPartitioner
(在ProducerConfig
中有定义)。该分区器首先会随机选择一个分区,并尽可能一直使用该分区,直到该分区批次(默认是16k
)已满或者已完成(linger.ms
配置的时间已到),Kafka会再随机选择一个和上一个分区不同的分区进行使用。
关于计算分区器的值有下面这几种情况:
指明partition
的情况下,直接将指明的值作为partition
的值
@GetMapping("/send")
public String sendMsg(@RequestParam(value = "size", defaultValue = "20") Integer size) throws InterruptedException {
for (int i = 0; i < size; i++) {
// 设置0分区发送数据
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "key:" + i, "这是第" + i + "条消息");
kafkaProducer.send(record, (metadata, exception) -> {
log.info("partition: {}, topic: {}, offset: {}", metadata.partition(), metadata.topic(), metadata.offset());
});
}
return "success";
}
没有指明partition
值但有key
的情况下,将key
和hash
值与topic
的partition
数进行取余得到partition
的值
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
partition
和key
都不存在的情况下,通过 stickyPartitionCache
的 partition
方法计算出分区。
系统里面支持如下几种:
DefaultPartitioner
:默认分区策略,如果key
也不存在,则会对可用分区进行轮询,如果没有指定分区,且存在key
值,则会根据key
的hash
进行取模来选择分区。(需要注意这里和以前的版本实现不一样)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
这里看看一下StickyPartitionCache
,这里面的具体实现:
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
indexCache
是一个 ConcurrentHashMap
对象,对应的是 Topic -> Partition
的映射,如果该值不存在则调用 nextPartition
方法选择一个分区并缓存。
public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // 但是分区是空或者与prevPartition相同 if (oldPart == null || oldPart == prevPartition) { // 获取主题对应partition List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); // partition数量小于1 if (availablePartitions.size() < 1) { // 获取一个线程安全的随机数 Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); // 取余返回新的partition newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { // partition数量等于1,返回对应的parition newPart = availablePartitions.get(0).partition(); } else { // partition数量大于1 while (newPart == null || newPart.equals(oldPart)) { // 生成的新partition不为null,并且与当前的partion不相同 int random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // 如果当前的partition是null,将partition缓冲更新 if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { // 否则替换旧的partition indexCache.replace(topic, prevPartition, newPart); } // 最后返回主题对应的partition return indexCache.get(topic); } // 最后返回主题对应的partition return indexCache.get(topic); }
这个过程其实比较简单的,这里其实也是粘性分区策略实现方式
UniformStickyPartitioner
:粘性分区策略
参看DefaultPartitioner
RoundRobinPartitioner
:轮询策略
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取主题对应的partition List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // 数量 int nextValue = nextValue(topic); // 主题对应的新值 // 获取主题对应的可用的partition List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { // 计算新的partition值 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 没有可用的分区,给一个不可用的分区 return Utils.toPositive(nextValue) % numPartitions; } } // 将主题对应的值+1 private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); }
自定义CustomizePartition
类,并且实现Partitioner
接口,重写partition
和close
方法
public class CustomizePartition implements Partitioner { /** * Compute the partition for the given record. * * @param topic 主题名 * @param key key值,不存在则是null * @param keyBytes 序列化的key,不存在则是null * @param value value值,不存在则是null * @param valueBytes 系列化的value,不存在则是null * @param cluster 集群信息 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; } /** * This is called when partitioner is closed. */ @Override public void close() {} /** * Configure this class with the given key-value pairs * * @param configs */ @Override public void configure(Map<String, ?> configs) {} }
使用的时候只需要在配置文件中指定一下就可以:
acks
:发送应答,默认值是1。batch.size
:批量发送大小,默认是16384(16k)bootstrap.servers
:服务器地址,多个服务器地址用逗号分割开buffer.memory
:生产者最大可用的缓冲,默认是33554432(32M)client.id
:生产者ID,默认是“”compression.type
:压缩类型,默认是producer(未压缩)
,还有其他的配置类型:gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztd
retries
:失败重试次数,默认整型的最大值(2147483647
)retry.backoff.ms
:重试阻塞时间delivery.timeout.ms
:传输时间connections.max.idle.ms
:关闭空闲连接时间,默认是540000
enable.idempotence
:开启幂等,默认是false,max.in.flight.request.per.connection
:单个连接上发送的未确认请求的最大连接数,默认是5interceptor.classes
:拦截器,默认是无拦截器key.seriailzer
:key的序列化器,默认是无value.seriailzer
:value的序列化器,默认是无linger.ms
:发送延迟时间,默认是0max.block.ms
:阻塞时间,默认是一分钟(60000)max.request.size
:最大请求字节大小,默认是1M(1048576)metric.reporters
:自定义指标报告器partitioner.class
:自定义分区器request.timeout.bytes
:请求超时时间,默认30000receive.buffer.bytes
:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小,默认值32k(32768)。如果值是-1,将使用OS默认值send.buffer.bytes
:发送数据时使用的TCP发送缓冲去(SO_SEDBUF)的大小,默认是是128k(131072)。如果值是-1,将使用OS的默认值Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。