赞
踩
• 0:生产者发送过来的数据,不需要等数据落盘应答。
风险:leader挂了之后,follower还没有收到消息。。。。
• 1:生产者发送过来的数据,Leader收到数据后应答。
风险:leader应答完成之后,还没有开始同步副本。。。。
生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。
可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
- // 设置 acks
- properties.put(ProducerConfig.ACKS_CONFIG, "all");
- // 重试次数 retries,默认是 int 最大值,2147483647
- properties.put(ProducerConfig.RETRIES_CONFIG, 3);
说明:开启事务,必须开启幂等性。
- package com.atguigu.kafka.producer;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class CustomProducerTransaction {
- public static void main(String[] args) {
- // 1. 创建 kafka 生产者的配置对象
- Properties properties = new Properties();
- // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop100:9092");
-
- // key,value 序列化(必须):key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
-
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");
- // 3. 创建 kafka 生产者对象
- KafkaProducer<String, String> kafkaProducer = new
- KafkaProducer<String, String>(properties);
- kafkaProducer.initTransactions();
- kafkaProducer.beginTransaction();
- // 4. 调用 send 方法,发送消息
- try{
- for (int i = 0; i < 5; i++) {
- kafkaProducer.send(new
- ProducerRecord<>("first","atguigu " + i));
- int i1 = 1 / 0;
- }
- kafkaProducer.commitTransaction();
- }catch (Exception e){
- kafkaProducer.abortTransaction();
- }finally {
- // 5. 关闭资源
- kafkaProducer.close();
- }
-
-
-
-
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。