赞
踩
需要的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
生产者:
Properties properties = new Properties(); //Kafka代理的地址,生产者建立连接broker的地址,如果是集群ip间用逗号隔开 properties.put("bootstrap.servers", "127.0.0.1:9092"); //除了all还可选0,或1。all表示复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 properties.put("acks", "all"); //用于序列化秘钥KEY对象 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //用于序列化值对象的类。下面的示例中,我们的值是String,因此我们可以使用StringSerializer类来序列化键。如果值是其他对象,则创建自定义序列化程序类 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //组id properties.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test"); //创建生产者实例 KafkaProducer<String,String> producer = new KafkaProducer<>(properties); //key,可以不传。key的作用是可以将同一topic的消息放到同一分区 String topic = "test"; //String key = "userName"; //value可以传json字符串,消费的时候转成json解析 String value = "cccc"; ProducerRecord record = new ProducerRecord<String, String>(topic, value); //发送记录 producer.send(record); producer.close();
消费者:
Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("group.id", "jd-group1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //设置每次拉取的数量,不设置默认是500 //properties.put("max.poll.records",100); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("test")); while (true) { Duration timeout = Duration.ofMillis(500); ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout); records.forEach(record -> { System.out.println("Record Key: " + record.key()); System.out.println("Record value: " + record.value()); System.out.println("Record partition: " + record.partition()); System.out.println("Record offset: " + record.offset()); System.out.println("------------------------------"); }); //异步提交偏移量到服务器broker kafkaConsumer.commitAsync(); }
消费过的再消费:
Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092");//xxx是服务器集群的ip properties.put("group.id", "jd-group1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("test")); Duration timeout1 = Duration.ofMillis(0); //timeout1为0则返回所有的分区信息,然后重新设置所有分区的偏移量 ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout1); long offset = 0; //TopicPartition 分区信息 for (TopicPartition partition : kafkaConsumer.assignment()) { kafkaConsumer.seek(partition, offset); } //再次消费 Duration timeout2 = Duration.ofMillis(500); ConsumerRecords<String, String> records_2 = kafkaConsumer.poll(timeout2); while (true) { records_2.forEach(record -> { System.out.println("Record Key: " + record.key()); System.out.println("Record value: " + record.value()); System.out.println("Record partition: " + record.partition()); System.out.println("Record offset: " + record.offset()); System.out.println("------------------------------"); }); //异步提交偏移量到服务器broker kafkaConsumer.commitAsync(); }
参考:
kafka 生产者使用详解
Java实现Kafka的生产者、消费者
Kafka consumer poll(long)与poll(Duration)的区别
Kafka之重新消费数据
Kafka auto.offset.reset值详解
kafka的auto.offset.reset详解
kafka auto.offset.reset latest earliest 详解
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。