当前位置:   article > 正文

Kafka系列之—向Kafka 写入数据(四)_kafka批量数据写入

kafka批量数据写入

一, 创建Kafka生产者

1.1 必选的三个属性

1.1.1 bootstrap.servers

指定broker的地址清单,不需要包含所有的broker地址,生产者会从给定的broker里找到其它broker的信息,建议最少提供两个broker的信息。

1.1.2 key.serializer

broker希望接收到的消息的键和值都是字节数组

1.1.3 value.serializer

指定的类会将值序列化

1.2 创建新的生产者示例

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers","broker1:9092","broker2:9092");

kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

producer = new KafkaProducer<String,String>(kafkaProps);

1.3 消息发送方式

1.3.1 发送并忘记

我们把消息发送给服务器,但并不关心它是否会正常到达,这种方式有时候会丢失一些消息。

1.3.2 同步发送

我们使用send()方法发送消息,他会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否成功发送

1.3.3 异步发送

我们调用send()方法,并指定一个回调函数。

1.4 发送消息到Kafka 

1.4.1 同步发送消息

  1. ProducerRecod<String, String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France")
  2.  try{
  3. producer.send(record).get();
  4. }catch(Exception e){
  5. e.printStackTrace();
  6. }

1.4.2 异步发送消息

  1. private class DemoProducerCallback implements Callback{
  2. @Override
  3. public void onCompletion(RecordMetadata recordMetadata, Exception e){
  4. if(e != null){
  5. e.printStackTrace();
  6. }
  7. }
  8. }
  9. ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials","USA");
  10. producer.send(record, new DemoProducerCallback());

1.5 生产者的配置

1.5.1 acks

acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息是写入成功的。

如果acks=0,生产者在成功写入消息之前不会等待来自服务器的响应,也就是如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。

如果acks=1, 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过如果一个没有收到消息的节点成为新首领,消息还是会丢失。

如果acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

1.5.2 buffer.memory

用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息,如果应用程序发送消息的速度长于发送到服务器的速度,会导致生产者空间不足,这个时候,send()方法要么被阻塞,要么抛出异常。

1.5.3 compression.type

默认情况下,消息发送时不会被压缩,该参数可以设置为snappy,gzip或者lz4,指定了消息被发送到broker之前使用哪一种压缩算法。

1.5.4 retries

生产者从服务器收到的错误有可能是临时性的错误,在这种情况下,retries参数的值决定了 生产者可以重发消息的次数,如果达到这个次数,生产者会放弃并重试并返回错误。

1.5.5 batch.size

当有多个消息需要被发送到同一个分区时,生产者会把他们放在同一个批次里,该参数指定了一个批次可以使用的内存大小,按照字节数计算。

1.5.6 linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或者linger.ms达到上限时把批次发送出去。

1.5.7 client.id

服务器用来识别消息的来源。

1.5.8 max.in.flight.requests.per.connection

指定了生产者在收到服务器响应之前可以发送多少个消息。把它设置为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

1.5.9 timeout.ms, request.timeout.ms和metadata.fetch.timeout.ms

request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms指定了生产者在获取元数据时等待服务器返回响应的时间。
timeout.ms指定了broker等待同步副本返回消息确认的时间。

1.5.10 max.block.ms

指定了在调用seng()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。

1.5.11 max.request.size

控制生产者发送消息的大小。它可以指发送单个消息的最大值,也可以指单个请求里所有消息总的大小。

1.5.12 receive.buffer.bytes和send.buffer.bytes

这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。

总结: 一般来说如果某些场景要求消息是有序的,那么消息是否写入成功也很关键,所以不建议把retries设为0。可以把max.in.flight.requests.per.connection设为1,这样生产者尝试发送第一批消息时,就不会有其它的消息发送给broker,只有对消息的顺序有严格要求的场景下才能这么做。

二 序列化器 

2.1 自定义序列化器

强烈建议使用通用的学历恶化框架

自定义序列化器仅供了解,代码案例如下:

  1. package com.example.kafkaproducer;
  2. import lombok.Value;
  3. /**
  4. * @author Admin
  5. */
  6. // Lombok注解
  7. @Value
  8. public class Person {
  9. String name;
  10. String address;
  11. }
  1. package com.example.kafkaproducer;
  2. import org.apache.kafka.common.serialization.Serializer;
  3. import java.io.UnsupportedEncodingException;
  4. import java.nio.ByteBuffer;
  5. import java.util.Map;
  6. /**
  7. * @author Admin
  8. */
  9. public class PersonSerializer implements Serializer<Person> {
  10. @Override
  11. public void configure(Map<String, ?> map, boolean b) {
  12. }
  13. @Override
  14. public byte[] serialize(String s, Person person) {
  15. if (person == null) {
  16. return null;
  17. }
  18. // 定义name和address的数组
  19. byte[] name, address;
  20. try {
  21. // 将Person的name属性转换为数组
  22. if (person.getName()!= null) {
  23. name = person.getName().getBytes("UTF-8");
  24. } else {
  25. name = new byte[0];
  26. }
  27. // 将Person的address属性转换为数组
  28. if (person.getAddress()!= null) {
  29. address = person.getAddress().getBytes("UTF-8");
  30. } else {
  31. address = new byte[0];
  32. }
  33. //使用ByteBuffer拼接数组
  34. ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
  35. buffer.putInt(name.length);
  36. buffer.put(name);
  37. buffer.putInt(address.length);
  38. buffer.put(address);
  39. // 返回buffer的数组对象
  40. return buffer.array();
  41. } catch (UnsupportedEncodingException e) {
  42. e.printStackTrace();
  43. }
  44. return new byte[0];
  45. }
  46. @Override
  47. public void close() {
  48. }
  49. }

2.2 使用Avro序列化

三 分区

Kafka的消息时一个个键值对,建的用途:可以作为消息的附加信息,也可以用来决定消息被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/614343
推荐阅读
相关标签
  

闽ICP备14008679号