赞
踩
broker配置
(1)配置条目的使用方式:
(2) 配置参数:
属性 | 说明 | 重要性 |
bootstrap.servers | 生产者客户端与broker集群建立初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写一个,以防该节点宕机的时候不可用。形式为: host1:port1,host2:port2,... . | high |
key.serializer | 实现了接口org.apache.kafka.common.serialization.Serializer 的key序化类。 | high |
value.serializer | 实现了接口org.apache.kafka.common.serialization.Serializer 的value序列化类。 | high |
acks | 该选项控制着已发送消息的持久性。 acks=0 :生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1。 acks=1 :leader将记录写到它本地日志,就响应客户端确认消息,而不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all :leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1] | high |
compression.type | 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。字符串类型的值。默认是none。 | high |
retries | 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设置max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647] | high |
- package org.apache.kafka.common.serialization;
-
- import java.io.Closeable;
- import java.util.Map;
-
- /**
- * 将对象转换为byte数组的接口
- * <p>
- * 该接口的实现类需要提供无参构造器
- *
- * @param <T> 从哪个类型转换
- */
- public interface Serializer<T> extends Closeable {
- /**
- * 类的配置信息
- *
- * @param configs key/value pairs
- * @param isKey key的序列化还是value的序列化
- */
- void configure(Map<String, ?> configs, boolean isKey);
-
- /**
- * 将对象转换为字节数组
- *
- * @param topic 主题名称
- * @param data 需要转换的对象
- * @return 序列化的字节数组
- */
- byte[] serialize(String topic, T data);
-
- /**
- * 关闭序列化器
- * 该方法需要提供幂等性,因为可能调用多次。
- */
- @Override
- void close();
- }
系统提供了该接口的子接口以及实现类:org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer
自定义序列化器
数据的序列化一般生产中使用avro。
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serialize 方法。
案例:
(1)实体类
- package com.lagou.kafka.demo.entity;
-
- /**
- * 用户自定义的封装消息的实体类
- */
- public class User {
- private Integer userId;
- private String username;
-
- public Integer getUserId() {
- return userId;
- }
-
- public void setUserId(Integer userId) {
- this.userId = userId;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
- }
(2)序列化类:
- package com.lagou.kafka.demo.serialization;
-
- import com.lagou.kafka.demo.entity.User;
- import org.apache.kafka.common.errors.SerializationException;
- import org.apache.kafka.common.serialization.Serializer;
-
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.util.Map;
-
- public class UserSerializer implements Serializer<User> {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // do nothing
- // 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
- }
-
- @Override
- public byte[] serialize(String topic, User data) {
- try {
- if (data == null) {
- return null;
- } else {
- final Integer userId = data.getUserId();
- final String username = data.getUsername();
-
- if (userId != null) {
- if (username != null) {
- final byte[] bytes = username.getBytes("UTF-8");
- int length = bytes.length;
- // 第一个4个字节用于存储userId的值
- // 第二个4个字节用于存储username字节数组的长度int值
- // 第三个长度,用于存放username序列化之后的字节数组
- ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
- // 设置userId
- buffer.putInt(userId);
- // 设置username字节数组长度
- buffer.putInt(length);
- // 设置username字节数组
- buffer.put(bytes);
- // 以字节数组形式返回user对象的值
- return buffer.array();
- }
- }
- }
- } catch (Exception e) {
- throw new SerializationException("数据序列化失败");
- }
- return null;
- }
-
- @Override
- public void close() {
- // do nothing
- // 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
- }
- }
(3)生产者:
- package com.lagou.kafka.demo.producer;
-
- import com.lagou.kafka.demo.entity.User;
- import com.lagou.kafka.demo.serialization.UserSerializer;
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.HashMap;
- import java.util.Map;
-
- public class MyProducer {
- public static void main(String[] args) {
- Map<String, Object> configs = new HashMap<>();
- configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
- configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- // 设置自定义的序列化器
- configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
-
- KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);
-
- User user = new User();
- // user.setUserId(1001);
- // user.setUsername("张三");
- // user.setUsername("李四");
- // user.setUsername("王五");
- user.setUserId(400);
- user.setUsername("赵四");
-
- ProducerRecord<String, User> record = new ProducerRecord<String, User>(
- "tp_user_01", // topic
- user.getUsername(), // key
- user // value
- );
-
-
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- System.out.println("消息发送异常");
- } else {
- System.out.println("主题:" + metadata.topic() + "\t"
- + "分区:" + metadata.partition() + "\t"
- + "生产者偏移量:" + metadata.offset());
- }
- }
- });
-
- // 关闭生产者
- producer.close();
-
- }
- }
默认(DefaultPartitioner)分区计算:
如果要自定义分区器,则需要
位于org.apache.kafka.clients.producer 中的分区器接口:
- package org.apache.kafka.clients.producer;
-
- import org.apache.kafka.common.Configurable;
- import org.apache.kafka.common.Cluster;
-
- import java.io.Closeable;
-
- /**
- * 分区器接口
- */
- public interface Partitioner extends Configurable, Closeable {
- /**
- * 为指定的消息记录计算分区值
- *
- * @param topic 主题名称
- * @param key 根据该key的值进行分区计算,如果没有则为null。
- * @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
- * null
- * @param value 根据value值进行分区计算,如果没有,则为null
- * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
- * null
- * @param cluster 当前集群的元数据
- */
- public int partition(String topic, Object key, byte[] keyBytes, Object
- value, byte[] valueBytes, Cluster cluster);
-
- /**
- * 关闭分区器的时候调用该方法
- */
- public void close();
- }
包org.apache.kafka.clients.producer.internals 中分区器的默认实现:
- package org.apache.kafka.clients.producer.internals;
-
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.ThreadLocalRandom;
- import java.util.concurrent.atomic.AtomicInteger;
-
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import org.apache.kafka.common.PartitionInfo;
- import org.apache.kafka.common.utils.Utils;
-
- /**
- * 默认的分区策略:
- * <p>
- * 如果在记录中指定了分区,则使用指定的分区
- * 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
- * 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
- */
- public class DefaultPartitioner implements Partitioner {
- private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
-
- public void configure(Map<String, ?> configs) {
- }
-
- /**
- * 为指定的消息记录计算分区值
- *
- * @param topic 主题名称
- * @param key 根据该key的值进行分区计算,如果没有则为null。
- * @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
- * null
- * @param value 根据value值进行分区计算,如果没有,则为null
- * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
- * null
- * @param cluster 当前集群的元数据
- */
- public int partition(String topic, Object key, byte[] keyBytes, Object
- value, byte[] valueBytes, Cluster cluster) {
- // 获取指定主题的所有分区信息
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- // 分区的数量
- int numPartitions = partitions.size();
- // 如果没有提供key
- if (keyBytes == null) {
- int nextValue = nextValue(topic);
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- if (availablePartitions.size() > 0) {
- int part = Utils.toPositive(nextValue) % availablePartitions.size();
- return availablePartitions.get(part).partition();
- } else {
- // no partitions are available, give a non-available partition
- return Utils.toPositive(nextValue) % numPartitions;
- }
- } else {
- // hash the keyBytes to choose a partition
- // 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
- }
-
- private int nextValue(String topic) {
- AtomicInteger counter = topicCounterMap.get(topic);
- if (null == counter) {
- counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
- AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
- if (currentCounter != null) {
- counter = currentCounter;
- }
- }
- return counter.getAndIncrement();
- }
-
- public void close() {
- }
- }
可以实现Partitioner接口自定义分区器:
然后在生产者中配置:
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
自定义拦截器:
案例:
(1)设置拦截器类:
- package com.lagou.kafka.demo.interceptor;
-
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.apache.kafka.common.header.Headers;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Map;
-
- public class InterceptorOne implements ProducerInterceptor<Integer, String> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
-
- @Override
- public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
- System.out.println("拦截器1 -- go");
-
-
- // 消息发送的时候,经过拦截器,调用该方法
-
- // 要发送的消息内容
- final String topic = record.topic();
- final Integer partition = record.partition();
- final Integer key = record.key();
- final String value = record.value();
- final Long timestamp = record.timestamp();
- final Headers headers = record.headers();
-
-
- // 拦截器拦下来之后根据原来消息创建的新的消息
- // 此处对原消息没有做任何改动
- ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
- topic,
- partition,
- timestamp,
- key,
- value,
- headers
- );
- // 传递新的消息
- return newRecord;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- System.out.println("拦截器1 -- back");
- // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
- // 会影响kafka生产者的性能。
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- final Object classContent = configs.get("classContent");
- System.out.println(classContent);
- }
- }
(2)生产者
- package com.lagou.kafka.demo.producer;
-
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.IntegerSerializer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.HashMap;
- import java.util.Map;
-
- public class MyProducer {
- public static void main(String[] args) {
-
- Map<String, Object> configs = new HashMap<>();
- configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
- configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
- // 此时可以保证发送消息即使在重试的情况下也是有序的。
- configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
- // configs.put("max.in.flight.requests.per.connection", 1);
-
- // interceptor.classes
- // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
- /* configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," +
- "com.lagou.kafka.demo.interceptor.InterceptorTwo," +
- "com.lagou.kafka.demo.interceptor.InterceptorThree"); */
- configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne");
-
-
- configs.put("classContent", "this is lagou's kafka class");
-
- KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
-
- ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
- "tp_inter_01",
- 0,
- 1001,
- "this is lagou's 1001 message"
- );
-
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println(metadata.offset());
- }
- }
- });
-
- // 关闭生产者
- producer.close();
-
-
- }
- }
由上图可以看出:KafkaProducer有两个基本线程:
(1)参数设置方式:
(2) 补充参数:
参数名称 | 描述 |
retry.backoff.ms | 在向一个指定的主题分区重发消息的时候,重试之间的等待时间。
|
retries | retries重试次数
|
request.timeout.ms | 客户端等待请求响应的最大时长。 如果服务端响应超时,则会重发请求,除非达到重试次数。该设置应该比replica.lag.time.max.ms (a broker configuration)要大,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,...] |
interceptor.classes | 在生产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进行处理。
|
acks | 当生产者发送消息之后,如何确认消息已经发送成功了。 支持的值: acks=0:
acks=1
acks=all
|
batch.size | 当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处理。批处理提高了客户端和服务器的处理效率。
|
client.id | 生产者发送请求的时候传递给broker的id字符串。
|
compression.type | 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。
|
send.buffer.bytes | TCP发送数据的时候使用的缓冲区(SO_SNDBUF)大小。如果设置为0,则使 用操作系统默认的。 |
buffer.memory | 生产者可以用来缓存等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms 的时间,此后它将引发异常。此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护运行中的请求。long型数据。默认值:33554432,可选值:[0,...] |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000 |
linger.ms | 生产者在发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次 一般情况是消息的发送的速度比消息累积的速度慢。有时客户端需要减少请求的次数,即使是在发送负载不大的情况下。该配置设置了一个延迟,生产者不会立即将消息发送到broker,而是等待这么一段时间以累积消息,然后将这段时间之内的消息作为一个批次发送。该设置是批处理的另一个上限:一旦批消息达到了batch.size 指定的值,消息批会立即发送,如果积累的消息字节数达不到batch.size 的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置linger.ms=5 ,则在一个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,...] |
max.block.ms | 控制KafkaProducer.send() 和KafkaProducer.partitionsFor() 阻塞的时长。 当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。long型值,默认:60000,可选值:[0,...] |
max.request.size | 单个请求的最大字节数。 该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有自己的限制批大小的设置,与该配置可能不一样。int类型值,默认1048576,可选值:[0,...] |
partitioner.class | 实现了接口org.apache.kafka.clients.producer.Partitioner 的分区器实现类。 默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。 int类型值,默认32768,可选值:[-1,...] |
security.protocol | 跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string类型值,默认:PLAINTEXT |
max.in.flight.requests.per.connection | 单个连接上未确认请求的最大数量。 达到这个数量,客户端阻塞。如果该值大于1,且存在失败的请求,在重试的时候消息顺序不能保证。int类型值,默认5。可选值:[1,...] |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。 在计算退避增量之后,添加20%的随机抖动以避免连接风暴。long型值,默认1000,可选值:[0,...] |
reconnect.backoff.ms | 尝试重连指定主机的基础等待时间。
|
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。