赞
踩
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency>
package com.baizhi.jsy.offset; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; public class ConsumerKafkaOffset { public static void main(String[] args) { //创建消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"Centos:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //关闭消费者偏移量自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //自动提交 系统默认提交间隔时间 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group01"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("topic01")); try { while (true){ //设置间隔多长时间取一次数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); //判断数据是否是空的 if(!consumerRecords.isEmpty()){ //提交偏移量的第一个形参 集合 按照每个分区去给偏移量 map的键为分区 值为偏移量 HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>(); Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> next = iterator.next(); offset.put(new TopicPartition(next.topic(),next.partition()),new OffsetAndMetadata(next.offset()+1)); showRecord(next); //提交偏移量 kafkaConsumer.commitAsync(offset, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offset, Exception e) { System.out.println(offset+"-----------"+e); } }); } } } } catch (Exception e) { e.printStackTrace(); }finally { kafkaConsumer.close(); } } private static void showRecord(ConsumerRecord<String, String> next){ String topic = next.topic(); System.out.println("topic = " + topic); String key = next.key(); System.out.println("key = " + key); String value = next.value(); System.out.println("value = " + value); long offset = next.offset(); System.out.println("offset = " + offset); int partition = next.partition(); System.out.println("partition = " + partition); long timestamp = next.timestamp(); System.out.println("timestamp = " + timestamp); System.out.println(); } }
Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间内应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。
如果重试N<=n次成功则认定此消息发送成功,如果N>n次依然失败,则认定本次发送失败,向上层跑出异常。开启重试虽然增强了可靠性,但是可能会导致服务器端存储重复消息。
package com.baizhi.jsy.ACKandRetries; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.text.DecimalFormat; import java.util.Properties; public class ProductKafkaAck { public static void main(String[] args) { //创建生产者 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Centos:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //优化参数 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);//生产者尝试缓存记录,为每一个分区缓存一个mb的数据 properties.put(ProducerConfig.LINGER_MS_CONFIG, 500);//最多等待0.5秒. //ack是0 不等待应答 只管发送 效率极高 //ack是1 等待Leader应答 只要写入数据到leader 就成功 但是可能刚写进去leader立即宕机了 //ack是-1 数据写入leader 然后等待fallow把数据copy再应答 效率低但是绝对安全 properties.put(ProducerConfig.ACKS_CONFIG,"-1"); //允许超时最大时间 properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,100); //失败尝试次数 properties.put(ProducerConfig.RETRIES_CONFIG,3); //开幂等性 精准一次写入 //properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> record = new ProducerRecord<>( "topic01", "Ack", "Test Ack And Retries (Idempotence)"); kafkaProducer.send(record); kafkaProducer.flush(); kafkaProducer.close(); } }
允许最大超时时间是1毫秒的时候,会产生四次请求。后三次是失败的尝试请求
允许最大超时时间是100毫秒的时候,会产生一次请求就成功。
acks值设置为-1 数据传给leader follow从leader上面copy到数据后leader才可以给出回应
此一次请求才算是成功。
acks值设置为0的时候 数据传给leader后就不用管了 不需要有回应
acks值设置为1的时候 数据传给leader后只需要leader给出回应后就算是成功了
package com.baizhi.jsy.ACKandRetries; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; public class ConsumerKafkaAck { public static void main(String[] args) { //创建消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"Centos:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group01"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("topic01")); try { while (true){ //设置间隔多长时间取一次数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); //判断数据是否是空的 if(!consumerRecords.isEmpty()){ Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> next = iterator.next(); String topic = next.topic(); System.out.println("topic = " + topic); String key = next.key(); System.out.println("key = " + key); String value = next.value(); System.out.println("value = " + value); long offset = next.offset(); System.out.println("offset = " + offset); int partition = next.partition(); System.out.println("partition = " + partition); long timestamp = next.timestamp(); System.out.println("timestamp = " + timestamp); System.out.println(); } } } } catch (Exception e) { e.printStackTrace(); }finally { kafkaConsumer.close(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。