当前位置:   article > 正文

kafka生产者2

kafka生产者2

1.数据可靠

• 0:生产者发送过来的数据,不需要等数据落盘应答。 

风险:leader挂了之后,follower还没有收到消息。。。。

• 1:生产者发送过来的数据,Leader收到数据后应答。

风险:leader应答完成之后,还没有开始同步副本。。。。

 

 生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。

可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

  1. // 设置 acks
  2. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  3. // 重试次数 retries,默认是 int 最大值,2147483647
  4. properties.put(ProducerConfig.RETRIES_CONFIG, 3);

2.数据去重 

 

3.生产者事务

 说明:开启事务,必须开启幂等性。

  1. package com.atguigu.kafka.producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class CustomProducerTransaction {
  7. public static void main(String[] args) {
  8. // 1. 创建 kafka 生产者的配置对象
  9. Properties properties = new Properties();
  10. // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  12. "hadoop100:9092");
  13. // key,value 序列化(必须):key.serializer,value.serializer
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  15. "org.apache.kafka.common.serialization.StringSerializer");
  16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  17. "org.apache.kafka.common.serialization.StringSerializer");
  18. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");
  19. // 3. 创建 kafka 生产者对象
  20. KafkaProducer<String, String> kafkaProducer = new
  21. KafkaProducer<String, String>(properties);
  22. kafkaProducer.initTransactions();
  23. kafkaProducer.beginTransaction();
  24. // 4. 调用 send 方法,发送消息
  25. try{
  26. for (int i = 0; i < 5; i++) {
  27. kafkaProducer.send(new
  28. ProducerRecord<>("first","atguigu " + i));
  29. int i1 = 1 / 0;
  30. }
  31. kafkaProducer.commitTransaction();
  32. }catch (Exception e){
  33. kafkaProducer.abortTransaction();
  34. }finally {
  35. // 5. 关闭资源
  36. kafkaProducer.close();
  37. }
  38. }
  39. }

4.数据乱序

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/142324
推荐阅读
相关标签
  

闽ICP备14008679号