当前位置:   article > 正文

Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

一、生产者配置

1. 必须要配置的参数:

  • kafaf集群地址列表:理论上写一个节点地址,就相当于绑定了整个kafka集群了,但是建议多写几个,如果只写一个,万一宕机就麻烦了
  • kafka消息的key和value要指定序列化方法
  • kafka对应的生产者id

使用java代码表示则为以下代码:

  1. //BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
  2. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  3. // 使用字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
  4. // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
  5. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6. // VALUE: 实际发送消息的内容
  7. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8. //CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
  9. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id");

2. 消息发送重试机制

 可使用 retries 参数 进行设置,同时要注意记住两个概念:可重试异常(重试可能会成功)、不可重试异常(无论重试多少次都不会成功);

retries设置的代码:

properties.put(ProducerConfig.RETRIES_CONFIG, 3);  # 默认是0

3. 一点说明

kafka的生产者是多线程安全的,表示多个线程可以同时共享同一个kafka生产者实例对象;但是kafka的消费者不是线程安全的。

kafka生产者提供的两个send()方法都是异步的,如下:

  1. Future<RecordMetadata> send(ProducerRecord<K, V> record); # 这个send()虽然是异步的,但是可以通过 返回对象调用get()方法达到同步的效果
  2. Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

kafka在生产环境中,一定要在在代码中关闭自动创建 topic .可通过 kafka-manage 控制台创建好 topic,再进行消息的发送与接收。

测试代码:

  1. public class NormalProducer {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Properties properties = new Properties();
  4. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  5. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "normal-producer");
  6. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8. // kafka 消息的重试机制: RETRIES_CONFIG该参数默认是0:
  9. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  10. // 可重试异常, 意思是执行指定的重试次数 如果到达重试次数上限还没有发送成功, 也会抛出异常信息
  11. // NetworkException
  12. // LeaderNotAvailableException
  13. // 不可重试异常
  14. // RecordTooLargeException
  15. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  16. User user = new User("100", "里德");
  17. // kafka默认是可以在没有主题的情况下创建的
  18. // 自动创建主题的特性,在生产环境中一定是禁用的
  19. ProducerRecord<String, String> record =
  20. new ProducerRecord<String, String>("normal-topic",
  21. JSON.toJSONString(user));
  22. /**
  23. * //一条消息 必须通过key 去计算出来实际的partition, 按照partition去存储的
  24. * ProducerRecord(
  25. * topic=topic_normal,
  26. * partition=null,
  27. * headers=RecordHeaders(headers = [], isReadOnly = false),
  28. * key=null,
  29. * value={"id":"001","name":"xiao xiao"},
  30. * timestamp=null)
  31. */
  32. System.err.println("新创建的消息:"+record);
  33. // 一个参数的send方法 本质上也是异步的 返回的是一个future对象; 可以实现同步阻塞方式
  34. /*
  35. Future<RecordMetadata> metadataFuture = producer.send(record);
  36. RecordMetadata recordMetadata = metadataFuture.get();
  37. System.err.println(String.format("发送结果:分区位置:%s, 偏移量:%s, 时间戳:%s",
  38. recordMetadata.partition(),
  39. recordMetadata.offset(),
  40. recordMetadata.timestamp()));
  41. */
  42. // 带有两个参数的send方法 是完全异步化的。在回调Callback方法中得到发送消息的结果
  43. Future<RecordMetadata> metadataFuture = producer.send(record, new Callback() {
  44. @Override
  45. public void onCompletion(RecordMetadata metadata, Exception exception) {
  46. if(null == exception) {
  47. System.err.println(String.format("发送结果:分区位置:%s, 偏移量:%s, 时间戳:%s",
  48. metadata.partition(),
  49. metadata.offset(),
  50. metadata.timestamp()));
  51. }else {
  52. exception.printStackTrace();
  53. return;
  54. }
  55. }
  56. });
  57. producer.close();
  58. }
  59. }

4. 生产者端的重要参数

(1) acks: 表示发送消息后,broker端至少有多少副本接收到该消息:

  • 默认acks=1, 表示只要 leader 副本接收到消息,就能收到来自服务端的成功响应
  • acks=0: 生产者发送消息之后,不要等待任务服务端的响应。
  • acks=-1 或 acks=all:生产者在消息发送之后,需要等待 ISR(In-sync Replication) 中的所有副本都成功写入消息之后,才能够收到来自服务端的成功响应。
  • 并不是asks=-1 或 acks=all 就一定会被投递成功,因为可能只有leader副本在ISR中,follower副本都在 OSR(Out-sync Replication)中,而消息还没来得及传给 OSR 中的副本,leader副本就宕机了。
  • 想要100%投递成立,还要配合参数 min.insync.replicas=2,表示至少两个副本接收到该消息,但是容易影响性能。 

关于ISR与OSR:最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。

(2)批量发送相关的参数

linger.ms:指定生产者发送ProducerBatch之前等待更多的消息加入producerBatch的时间,默认值为0,就像是等人上车的时间

batch.size:累计多少条消息,则一次进行批量发送,就是满多少人即发车的意思

buffer.memory:缓存大小,可以修改它提升缓存性能,默认32M

(3) 其他参数

max.request.size:该参数用来限制生产者客户端能发送的消息的最大值,默认值是 1M

retries和retry.backoff.msretries:重试次数和重试间隔时间,第一个默认0,第二个默认100ms

compression.type:指定对发送的消息的压缩方式,默认为“none”,可选gzip,snappy,lz4

connections.max.idle.ms:这个参数用来指定连接空闲多久之后关闭,默认540000ms,即9分钟

receive.buffer.bytes:设置socket接收消息缓冲区 默认32KB

send.buffer.bytes:设置socket发送消息缓冲区 默认128KB

request.timeout.ms:配置producer等待请求broker响应的最长时间,默认30000ms

二、消费者配置 

1. 必要的参数

bootstrap.servers: 用来指定连接 Kafka集群所需的broker 地址清单

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

key.deserializer 和 value.deserializer: 反序列化参数

  1. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  2. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

group.id:消费者所属消费组

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-module-consumer");

subscribe:消息主题订阅,支持集合/标准正则表达式;

  1. # 订阅主题集合
  2. consumer.subscribe(Collections.singletonList("topic-module"));
  3. # 正则表达式
  4. consumer.subscribe(Pattern.compile("topic-.*"));

assign:只订阅主题的某个分区

consumer.assign(Arrays.asList(new TopicPartition("topic-module", 0), new TopicPartition("topic-module", 4)));

2. 其他参数

fetch.min.bytes:一次拉取最小数据量,默认为1B

fetch.max.bytes: 一次拉取最大数据量,默认为50M

max.partition.fetch.bytes: 一次fetch请求,从一个partition中取得的records最大大小,默认1M

fetch.max.wait.ms: Fetch请求发给broker后,在broker中可能会被阻塞,默认等待的时长500毫秒

maxpoll.records: Consumer每次调用poll()时取到的records的最大数,默认为500条

3. 消费者提交commit操作

(1)自动提交

自动提交: enableauto.commit ,默认值为true,和参数:提交周期间隔 auto.commit.interval.ms 搭配使用,默设值为5秒

  1. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  2. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

(2)手工提交

手工提交,需要将 enable.auto.commit配置为false;并使用 commitSync或者commitAsync进行提交,这两种方式一个是同步提交,一个是异步提交;无论是同步还是异步,都支持整体提交和按分区提交

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

示例代码

  1. public class NormalConsumer {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  5. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  7. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-module");
  8. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  9. // 改成手动提交
  10. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  11. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  12. // 消费者默认每次拉取的位置:从什么位置开始拉取消息
  13. // AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none" 默认值是latest
  14. // none
  15. // latest 从一个分区的最后提交的offset开始拉取消息
  16. // earliest 从最开始的起始位置拉取消息 0
  17. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  18. consumer.subscribe(Collections.singletonList("topic-module"));
  19. System.err.println("quickstart consumer started...");
  20. try {
  21. while(true) {
  22. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  23. for(TopicPartition topicPartition : records.partitions()) {
  24. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
  25. String topic = topicPartition.topic();
  26. int size = partitionRecords.size();
  27. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
  28. topic,
  29. topicPartition.partition(),
  30. size));
  31. for(int i = 0; i < size; i++) {
  32. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
  33. String value = consumerRecord.value();
  34. long offset = consumerRecord.offset();
  35. long commitOffser = offset + 1;
  36. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
  37. value, offset, commitOffser));
  38. // 在一个partition内部,每一条消息记录 进行一一提交方式
  39. // 按分区提交:同步方式
  40. consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)));
  41. // 按分区提交:异步方式 (这种按照partition维度,并且是异步的提交方式使用最多)
  42. consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)), new OffsetCommitCallback() {
  43. @Override
  44. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  45. if(null == exception) {
  46. System.err.println("按分区进行提交成功,偏移量:" + offsets);
  47. }else {
  48. System.err.println("提交失败");
  49. }
  50. }
  51. });
  52. }
  53. }
  54. // 整体提交:同步方式
  55. // consumer.commitSync();
  56. // 整体提交:异步方式
  57. /*consumer.commitAsync(new OffsetCommitCallback() {
  58. @Override
  59. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  60. if(exception == null){
  61. System.err.println("整体提交成功,偏移量:"+offsets);
  62. }else {
  63. System.err.println("提交失败,"+exception);
  64. }
  65. }
  66. });*/
  67. }
  68. } finally {
  69. consumer.close();
  70. }
  71. }
  72. }

三、自定义拦截器

1. 自定义生产者拦截器

自定义生产者拦截器类需要继承 org.apache.kafka.clients.producer.ProducerInterceptor,并实现其中的方法: 

  • onSend(ProducerRecord record)是发送消息之前的切面方法;
  • onAcknowledgement(RecordMetadata metadata, Exception exception)是发送消息之后的切面方法;
  • close()是生产者关闭前调用的方法;’
  • configure(Map<String, ?> configs)是拦截器用于配置一些属性的方法;

拦截器代码示例CustomProducerInterceptor.java: 

  1. public class CustomProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
  2. private volatile int success;
  3. private volatile int failure;
  4. // 发送消息之前的切面拦截
  5. @Override
  6. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  7. System.err.println("生产者发送前置方法!");
  8. String value = "prefix:"+record.value();
  9. return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), value, record.headers());
  10. }
  11. // 发送消息之后的切面拦截
  12. @Override
  13. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  14. if(null == exception){
  15. success++;
  16. }else {
  17. failure++;
  18. }
  19. System.err.println("生产者发送后置方法!");
  20. }
  21. @Override
  22. public void close() {
  23. System.err.println(String.format("发送成功率:%s %%", success*100/success+failure));
  24. }
  25. @Override
  26. public void configure(Map<String, ?> configs) {
  27. }
  28. }

将拦截器类定义好之后,只需要在生产者创建时,作为一个属性配置传进去(CustomProducerInterceptor.class是自定义拦截器类):

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());

2. 自定义消费者拦截器

需要实现的接口为 org.apache.kafka.clients.consumer.ConsumerInterceptor ,并实现其中的方法:

  • onConsume(ConsumerRecords records)是接到消息,但是处理之前的切面方法;
  • onCommit(Map<TopicPartition, OffsetAndMetadata> offsets)是消息处理完成之后,提交处理结果之前的切面方法,(如果为自动提交,会按时间间隔不停进行提交操作,那么该切面方法也会被不断地执行)
  • close()是消费者关闭前的切面方法;
  • configure(Map<String, ?> configs)是拦截器配置一些属性的方法;

拦截器代码示例 CustomProducerInteceptor.java:

  1. public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
  2. // onConsume:消费者接到消息处理之前的拦截器
  3. @Override
  4. public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
  5. System.err.println("消费者消费前置方法!");
  6. return records;
  7. }
  8. @Override
  9. public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
  10. System.err.println("消费者消费后置方法!");
  11. offsets.forEach((tp, om) -> {
  12. System.err.println(String.format("分区位置:%s,提交偏移量:%s", tp, om));
  13. });
  14. }
  15. @Override
  16. public void close() {
  17. }
  18. @Override
  19. public void configure(Map<String, ?> configs) {
  20. }
  21. }

四、序列化

1. 实现自定义对象的序列化

这里自定义对象为 User.java:

  1. public class User {
  2. private String id;
  3. private String name;
  4. public User() {
  5. }
  6. public User(String id, String name) {
  7. this.id = id;
  8. this.name = name;
  9. }
  10. // getter、setter省略
  11. }

自定义序列化类需要实现 org.apache.kafka.common.serialization.Serializer接口:

SerializerProducer.java

  1. public class SerializerProducer implements Serializer<User> {
  2. @Override
  3. public byte[] serialize(String topic, User user) {
  4. try {
  5. if (user == null) {
  6. return null;
  7. }
  8. else {
  9. String id = user.getId();
  10. String name = user.getName();
  11. byte[] idBytes, nameBytes;
  12. if(null == id){
  13. idBytes = new byte[0];
  14. }else {
  15. idBytes = id.getBytes("UTF-8");
  16. }
  17. if(null == name){
  18. nameBytes = new byte[0];
  19. }else {
  20. nameBytes = name.getBytes("UTF-8");
  21. }
  22. ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + idBytes.length + nameBytes.length);
  23. // 4个字节 也就是一个 int类型 : putInt 盛放 idBytes的实际真实长度
  24. byteBuffer.putInt(idBytes.length);
  25. // put bytes[] 实际盛放的是idBytes真实的字节数组,也就是内容
  26. byteBuffer.put(idBytes);
  27. byteBuffer.putInt(nameBytes.length);
  28. byteBuffer.put(nameBytes);
  29. return byteBuffer.array();
  30. }
  31. } catch (UnsupportedEncodingException e) {
  32. throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding ", e);
  33. }
  34. }
  35. @Override
  36. public void configure(Map<String, ?> configs, boolean isKey) {
  37. }
  38. @Override
  39. public void close() {
  40. }
  41. }

这里是对 消息的value,也就是 User 对象进行序列化,所以需要在生产者配置属性中加入自定义的序列化类:

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerProducer.class.getName());

2. 实现自定义对象的反序列化

反序列化类需要实现 org.apache.kafka.common.serialization.Deserializer类:

DeserializerConsumer.java

  1. public class DeserializerConsumer implements Deserializer<User> {
  2. @Override
  3. public User deserialize(String topic, byte[] data) {
  4. if(null == data){
  5. return null;
  6. }
  7. if(data.length < 8){
  8. throw new SerializationException("size is wrong, must be data.length >= 8");
  9. }
  10. ByteBuffer byteBuffer = ByteBuffer.wrap(data);
  11. // idBytes 字节数组的真实长度
  12. int idSize = byteBuffer.getInt();
  13. byte[] idBytes = new byte[idSize];
  14. byteBuffer.get(idBytes);
  15. // nameBytes 字节数组的真实长度
  16. int nameSize = byteBuffer.getInt();
  17. byte[] nameBytes = new byte[nameSize];
  18. byteBuffer.get(nameBytes);
  19. String id, name;
  20. try {
  21. id = new String(idBytes, "UTF-8");
  22. name = new String(nameBytes, "UTF-8");
  23. } catch (UnsupportedEncodingException e) {
  24. throw new SerializationException("deserializing error! ", e);
  25. }
  26. return new User(id, name);
  27. }
  28. @Override
  29. public void configure(Map<String, ?> configs, boolean isKey) {
  30. }
  31. @Override
  32. public void close() {
  33. }
  34. }

“将User对象直接转为json字符串,然后将字符串直接使用 getBytes("UTF-8") 方法转为字节数组”这种序列化方法也可以,不过这里是尝试另一种方法,即上面使用ByteBuffer拼接字节数组的方法  

这里是对 消息的value,也就是 User 对象进行反序列化,所以需要在消费者配置属性中加入自定义的反序列化类:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerConsumer.class.getName());

五、分区器

1. 分区器

默认分区器:是对kafka消息中的key进行一个hash计算,从而得到投递到具体哪个分区的区号;

另外可根据自己的实际业务场景自定义分区器,需要实现 org.apache.kafka.clients.producer.Partitioner 类:

  1. public class CustomPartitioner implements Partitioner {
  2. private AtomicInteger counter = new AtomicInteger(0);
  3. @Override
  4. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  5. List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
  6. int numPartitions = partitionInfoList.size();
  7. System.err.println("---- 进入自定义分区器,当前分区个数:" + numPartitions);
  8. if(keyBytes == null){
  9. return counter.getAndIncrement() % numPartitions;
  10. }else {
  11. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  12. }
  13. }
  14. @Override
  15. public void close() {
  16. }
  17. @Override
  18. public void configure(Map<String, ?> configs) {
  19. }
  20. }

 并在生产者的配置属性中增加该分区器类:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

2. 应用场景

什么情况下会需要自定义分区器?

比如有四种类型的订单:零食、衣服、灯泡、汽车,根据业务类型,让消息进入到各自的分区,也就是一个分区一种类型的数据,能够让各自类型的consumer快速获取属于自己的业务数据。

如果把所有数据随机的放到某个partation中,那么就会造成数据混乱,因为消息队列是顺序消费的(partition中的数据是先进先出),一些热门类型的业务占据大部分消息,比如零食的订单量远远高于汽车的订单量,零食的订单在消息partition中的前面,汽车的在后面,这就会一直堵塞汽车的消息迟迟到不了consumer端,导致汽车明明有订单,但是状态却是一直无法处理中。

所以最好的方法就是根据类型进行分区,不同的类型数据单独放到对应的partation中,一个类型的数据对应一个partation,可以通过类型自定义分区器。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/614339
推荐阅读
相关标签
  

闽ICP备14008679号