赞
踩
public static void main(String[] args) { Properties prop = new Properties(); // 指定broker地址 prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); // 消息序列化 prop.put("key.serializer", StringSerializer.class.getName()); prop.put("value.serializer", StringSerializer.class.getName()); // 创建生产者 KafkaProducer producer = new KafkaProducer<String, String>(prop); // f发送数据 String topic = "hello"; producer.send(new ProducerRecord<String, String>(topic, "hello kafka producer")); // close producer.close(); }
public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.52.100:9092,192.168.52.101:9092,192.168.52.102:9092"); // 反序列化 prop.put("key.deserializer", StringDeserializer.class.getName()); prop.put("value.deserializer", StringDeserializer.class.getName()); // 指定消费者组 prop.put("group.id", "con-1"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop); Collection<String> topics = new ArrayList<>(); topics.add("hello"); // 订阅指定的topic consumer.subscribe(topics); while(true) { // 消费数据 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord message: consumerRecords ) { System.out.println(message); } } }
// 开启自动提交功能,默认是开启
prop.put("enable.auto.commit", "true");
// 自动提交时间间隔
prop.put("auto.commit.interval.ms", "5000");
// 先根据group.id指定的消费者组查询保存的offset信息
// 如果找到了,说明之前消费过该消费组的消息,则根据之前保存的offset继续消费
// 如果没有找到,说明是第一次消费,或者说是之前的offset对应的数据已经不存在了,此时就会根据auto.offset.reset 的值执行不同的消费逻辑
// earliest:从最早的数据开始消费,从头开始
// latest : 最新的数据开始消费-默认的策略
// none : 抛出异常
// 在实时计算的场景下,建议设置为latest
// 这个参数只会在消费者第一次消费或者对应的offset没有数据的时候才会生效
prop.put("auto.offset.reset", "latest");
# 查询消费者信息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --list --bootstrap-server hadoop01:9092
con-1
# 消费组描述
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop01:9092 --group con-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
con-1 hello 2 1 1 0 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 3 1 1 0 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 1 0 0 0 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 0 1 1 0 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
con-1 hello 4 2 2 0 consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1 consumer-con-1-1
当一个消费者消费一个partition的时候,消费的数据顺序和此partition数据的生产顺序是一致的
当一个消费者消费多个partition的时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition的数据
总之,如果一个消费者消费多个partition,只能保证消费者的数据顺序在一个partition内有序
// 将自动提交设置为false
prop.put("enable.auto.commit", "false");
// 手动提交
consumer.commitAsync();
至多一次:at-most-once,默认实现
仅此一次:exactly-once
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。