赞
踩
官网地址:https://kafka.apache.org
本文以 2.12
版本为例。
$ bin/kafka-server-start.sh config/server.properties
# 创建topic(1个分区,1个副本)
# --partitions: 分区数
# --replication-factor: 副本数
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
# 配置delete.topic.enable为true,这样才能删除topic
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# --from-beginning: 表示从头开始接收数据
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 也可以指定分组
#--group: 指定消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:9092
$ bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:
$ bin/kafka-verifiable-producer.sh --broker-list localhost:9092 --topic test --max-messages 10
如果 kafka 集群的 zk 配置了 chroot 路径,那么需要加上/path
。
$ bin/zookeeper-shell.sh localhost:2181[/path]
$ ls /brokers/ids
$ get /brokers/ids/0
1、创建规则json
$ cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}] } EOF
2、执行
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
3、验证
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
同步是发送消息完成之后,需要等待对方响应之后才能继续干其他的。
异步则是发送完消息之后,就可以继续往下执行业务逻辑。
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * 生产消息到Kafka中(同步) * * @author tom */ public class MyProducer1 { public static void main(String[] args) { // 1、创建用于连接Kafka的Properties配置 Properties properties = new Properties(); // 指定连接的kafka集群 properties.put("bootstrap.servers", "127.0.0.1:9092"); // ack应答级别 // all等价于-1 0 1 properties.put("acks", "all"); // 重试次数 properties.put("retries", 1); // 批次大小 // 16k properties.put("batch.size", 16384); // 等待时间 properties.put("linger.ms", 1); // recordAccumulator缓冲区大小 // 32m properties.put("buffer.memory", 33554432); // key,Value的序列化类 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2、创建一个生产者对象KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3、调用send发送1-10消息到指定Topic topic1 for (int i = 1; i < 11; ++i) { try { // 获取返回值Future,该对象封装了返回值 Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", null, i + "")); // 4、调用一个Future.get()方法等待响应 RecordMetadata recordMetadata = future.get(); System.out.println("success->" + " partition = " + recordMetadata.partition() + " | offset = " + recordMetadata.offset()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } // 5、关闭生产者 producer.close(); } }
程序输出:
注意:如果在 send() 方法后接着调用 get() 方法,那么就是有序的同步方法,消息会一条接一条的发送
send(xxx).get()。
在同步发送的前提下,生产者在获得集群返回的 ack 之前那会一直阻塞。那么集群什么时候返回ack呢?
此时ack有3个配置:
ack=0:kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息,但是效率
是最高的。
ack=1(默认):多副本之间得leader已经收到消息,并把消息写入到本地log中,才会返回ack给生产者,性
能和安全是最均衡的。
ack=-1/all:里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和
一个follower同步完成之后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式
最安全,但性能最差。
下面是关于ack和重试(如果没有收到ack,就开启重试)的配置:
// 发送失败会重试,默认重试时间间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
// 重试次数设置
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
Kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
Kafka本地线程会去缓冲区拉一次16K的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
如果拉不到16K的数据,间隔10ms也会将已拉到的数据发送到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产消息到Kafka中(异步回调) * * @author tom */ public class MyProducer2 { public static void main(String[] args) { // 1、创建用于连接Kafka的Properties配置 Properties properties = new Properties(); // 指定连接的kafka集群 properties.put("bootstrap.servers", "127.0.0.1:9092"); // ack应答级别 // all等价于-1 0 1 properties.put("acks", "all"); // 重试次数 properties.put("retries", 1); // 批次大小 // 16k properties.put("batch.size", 16384); // 等待时间 properties.put("linger.ms", 1); // recordAccumulator缓冲区大小 // 32m properties.put("buffer.memory", 33554432); // key,Value的序列化类 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2、创建一个生产者对象KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3、调用send发送1-10消息到指定Topic topic2 for (int i = 1; i < 11; i++) { // 回调函数,该方法会在Producer收到ack时调用,为异步调用 // 如果key为确定值,那么分区也就确定了,所以我这里没有写死 producer.send(new ProducerRecord<>("topic2", null, i + ""), (metadata, exception) -> { if (exception == null) { System.out.println("success->" + " partition = " + metadata.partition() + " | offset = " + metadata.offset()); } else { exception.printStackTrace(); } }); } // 5、关闭生产者 producer.close(); } }
程序输出:
分区的分配基本由 ProducerRecord 的参数决定。
指明 partition 的情况下,直接将指明的值直接作为 partiton 值。
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),
将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定义分区 * @author tom */ public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // Integer num = cluster.partitionCountForTopic(topic); // return key.toString().hashCode() % num; return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
package com.example.kafka.demo1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产消息到Kafka中(自定义分区策略的插入) * * @author tom */ public class MyProducer3 { public static void main(String[] args) { // 1、创建用于连接Kafka的Properties配置 Properties properties = new Properties(); // 指定连接的kafka集群 properties.put("bootstrap.servers", "127.0.0.1:9092"); // ack应答级别 // all等价于-1 0 1 properties.put("acks", "all"); // 重试次数 properties.put("retries", 1); // 批次大小 // 16k properties.put("batch.size", 16384); // 等待时间 properties.put("linger.ms", 1); // recordAccumulator缓冲区大小 // 32m properties.put("buffer.memory", 33554432); // 设置分区 properties.put("partitioner.class", "com.example.kafka.demo1.MyPartitioner"); // key,Value的序列化类 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2、创建一个生产者对象KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3、调用send发送1-10消息到指定Topic topic3 for (int i = 1; i < 11; i++) { // 回调函数,该方法会在Producer收到ack时调用,为异步调用 // 如果key为确定值,那么分区也就确定了,所以我这里没有写死 producer.send(new ProducerRecord<>("topic3", null, i + ""), (metadata, exception) -> { if (exception == null) { System.out.println("success->" + " partition = " + metadata.partition() + " | offset = " + metadata.offset()); } else { exception.printStackTrace(); } }); } // 5、关闭生产者 producer.close(); } }
程序输出:
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消
费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。所以 offset 的维护是
Consumer 消费数据是必须考虑的问题。
从 topic1中,将消息都消费,并将记录的offset、key、value打印出来。
package com.example.kafka.demo1; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 从Kafka的topic中消费消息(自动提交offset的消费者) * * @author tom */ public class MyConsumer1 { public static void main(String[] args) throws InterruptedException { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 开启自动提交offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交offset的时间间隔 auto.commit.interval.ms properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // key,value的反序列化 key.deserializer、value.deserializer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group1"); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 3、订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 // kafkaConsumer.subscribe(Collections.singletonList("topic1")); kafkaConsumer.subscribe(Arrays.asList("topic1")); // 4、使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5、将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset: 这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key、value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); System.out.println(consumerRecord.key() + "----" + consumerRecord.value()); } Thread.sleep(1000); } //consumer无需close() } }
程序输出:
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点
是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,
并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可
能提交失败。
package com.example.kafka.demo1; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 从Kafka的topic中消费消息(手动提交offset的消费者) * * @author tom */ public class MyConsumer2 { public static void main(String[] args) throws InterruptedException { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 关闭自动提交 offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // key,value的反序列化 key.deserializer、value.deserializer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group2"); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 3、订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 // kafkaConsumer.subscribe(Collections.singletonList("topic2")); kafkaConsumer.subscribe(Arrays.asList("topic2")); // 4、使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5、将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset: 这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key、value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); System.out.println(consumerRecord.key() + "----" + consumerRecord.value()); } // 1.同步提交,当前线程会阻塞直到offset提交成功 kafkaConsumer.commitSync(); // 2.异步提交 // kafkaConsumer.commitAsync(); Thread.sleep(1000); } //consumer无需close() } }
程序输出:
1、提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费者组+消费的某个主题+消费的某个分区+消费的偏移
量,这样的信息提交到集群的_consumer_offsets主题里面。
2、自动提交
消费者poll消息下来以后就会自动提交到offset
//是否自动提交offset,默认:true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
3、手动提交
将自动提交的配置改为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交分为了两种:
手动同步提交:在消费完消费后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,
执行之后的逻辑。
// 所有消息已经消费完
// 有消息
if(records.count() > 0){
// 手动同步提交offset,当前线程会阻塞,直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
//=========阻塞==== 提交成功
consumer.commitSync();
}
手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,
供集群调用。
//所有消息已经消费完
//有消息
if(records.count() > 0){
//手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if(e != null){
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + e.getStackTrace());
}
}
});
}
默认情况下,消费者一次会poll 500条消息。
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代码中设置了长轮询的时间是1000毫秒:
// 设置长轮询时间是1000ms
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
意味着:
如果一次poll到500条,直接执行for循环。
如果这一次没有poll到500条,且时间在1s内,要么长轮询继续poll,要么到500条,要么到1s。
如果多次poll都没达到500条,且1s时间到了,那么直接执行for循环。
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被提出消费组,触发rebalance机
制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点。
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间超出了30s的时间间隔,kafka会认为其消费能力过弱,将其提出消费组,将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费
组的rebalance机制,将该分区交给其他消费组里的其他消费者进行消费。
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
// 只消费foo的分区0和分区1,不可以与subscribe方法同用
consumer.assign(Arrays.asList(partition0, partition1));
//TOPIC_NAME主题下的0号分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);//offset=10
TopicPartition partition = new TopicPartition("foo", 0);
// 获取foo的分区0的最早offset
long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition)).get(partition);
// 获取foo的分区0的最新offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);
TopicPartition partition = new TopicPartition("foo", 0);
long offset = args[0];
// 移动到指定offset
consumer.seek(partition, offset);
// 移动到最新offset
consumer.seekToEnd(Collections.singletonList(partition));
// 移动到最早offset
consumer.seekToBeginning(Collections.singletonList(partition));
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始
消费。
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); //从1⼩时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); //根据消费⾥的timestamp确定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
新消费组的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过
以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
参考文档:
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
package com.example.kafka.demo1; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * 从Kafka的topic中消费消息(自定义提交策略) * 简单消费者--订阅主题带再均衡处理器 * * @author tom */ public class MyConsumer3 { public static void main(String[] args) throws InterruptedException { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 开启自动提交offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交offset的时间间隔 auto.commit.interval.ms properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // key,value的反序列化 key.deserializer、value.deserializer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3"); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 3、订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Arrays.asList("topic3"), new ConsumerRebalanceListener() { // 在均衡开始之前和消费者停止读取消息之后调用 // 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 提交位移 kafkaConsumer.commitSync(); } // 在重新分配分区之后和消费者开始读取消息之前调用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { long committedOffset = -1; for (TopicPartition topicPartition : partitions) { // 获取该分区已消费的位移 committedOffset = kafkaConsumer.committed(topicPartition).offset(); // 重置位移到上一次提交的位移处开始消费 kafkaConsumer.seek(topicPartition, committedOffset + 1); } } }); try { // 4、使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5、将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key、value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); System.out.println(consumerRecord.key() + "----" + consumerRecord.value()); } Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }
程序输出:
package com.example.kafka.demo1; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * 从Kafka的topic中消费消息(自定义提交策略) * 提交指定偏移量和再均衡处理器实现 * * @author tom */ public class MyConsumer4 { public static void main(String[] args) { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 关闭自动提交 offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // key,value的反序列化 key.deserializer、value.deserializer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3"); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); // 3、订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Collections.singletonList("topic3"), new ConsumerRebalanceListener() { // 在均衡开始之前和消费者停止读取消息之后调用 // 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in rebalance committing current offsets:" + currentOffsets); // 提交位移 kafkaConsumer.commitSync(currentOffsets); } // 在重新分配分区之后和消费者开始读取消息之前调用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 不做任何操作,也可以如下指定消费偏移量 long committedOffset = -1; for (TopicPartition topicPartition : partitions) { // 获取该分区已消费的位移 committedOffset = kafkaConsumer.committed(topicPartition).offset(); // 重置位移到上一次提交的位移处开始消费 kafkaConsumer.seek(topicPartition, committedOffset + 1); } } }); try { // 4、使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5、将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key、value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); System.out.println(consumerRecord.key() + "----" + consumerRecord.value()); // 设置需要提交的偏移量 currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata")); } // 手动异步提交指定偏移量 kafkaConsumer.commitAsync(currentOffsets, null); } } catch (Exception e) { e.printStackTrace(); } finally { try { kafkaConsumer.commitSync(currentOffsets); } finally { kafkaConsumer.close(); System.out.println("closed consumer..."); } } } }
程序输出:
1、往kafka中写入user数据
2、消费kafka中user的数据
3、kafka的topic是userinfo
package com.example.kafka.demo2; import java.io.*; /** * @author tom */ public class BeanConversion { /** * 对象转字节数组 * * @param obj * @return */ public static byte[] ObjectToBytes(Object obj) { byte[] bytes = null; ByteArrayOutputStream bo = null; ObjectOutputStream oo = null; try { bo = new ByteArrayOutputStream(); oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (bo != null) { bo.close(); } if (oo != null) { oo.close(); } } catch (IOException e) { e.printStackTrace(); } } return bytes; } /** * 字节数组转对象 * * @param bytes * @return */ public static Object BytesToObject(byte[] bytes) { Object obj = null; ByteArrayInputStream bi = null; ObjectInputStream oi = null; try { bi = new ByteArrayInputStream(bytes); oi = new ObjectInputStream(bi); obj = oi.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (bi != null) { bi.close(); } if (oi != null) { oi.close(); } } catch (IOException e) { e.printStackTrace(); } } return obj; } }
package com.example.kafka.demo2;
import org.apache.kafka.common.serialization.Serializer;
/**
* @author tom
*/
public class EncodingKafka implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
return BeanConversion.ObjectToBytes(data);
}
}
package com.example.kafka.demo2;
import org.apache.kafka.common.serialization.Deserializer;
/**
* @author tom
*/
public class DecodingKafka implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return BeanConversion.BytesToObject(data);
}
}
package com.example.kafka.demo2; import java.io.Serializable; /** * @author tom */ public class User implements Serializable { private int id; private int age; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
package com.example.kafka.demo2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * 生产者发送数据 * * @author tom */ public class KafkaProducerUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) { // 1、创建用于连接Kafka的Properties配置 Properties properties = new Properties(); // 指定连接的kafka集群 properties.put("bootstrap.servers", "127.0.0.1:9092"); // ack应答级别 // all等价于-1 0 1 properties.put("acks", "all"); // 重试次数 properties.put("retries", 1); // 批次大小 // 16k properties.put("batch.size", 16384); // 等待时间 properties.put("linger.ms", 1); // recordAccumulator缓冲区大小 // 32m properties.put("buffer.memory", 33554432); // key,Value的序列化类 // key的序列化,其类型是Integer properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // 自己定义的序列化 // value的序列化,其类型是Object properties.put("value.serializer", "com.example.kafka.demo2.EncodingKafka"); // 2、创建一个生产者对象KafkaProducer KafkaProducer producer = new KafkaProducer<Integer, User>(properties); for (int i = 0; i < 100; i++) { User u = new User(); u.setId(i * 3); u.setName("name" + i); if (i < 20) { u.setAge(i + 20); } else { u.setAge(i); } // public ProducerRecord(String topic, K key, V value) ProducerRecord<Integer, User> producerRecord = new ProducerRecord<>(TOPIC_NAME, u.getId(), u); // 3、调用send发送消息到指定Topic // send()方法会返回一个包含RecordMetadata的Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功 // 如果不关心发送结果,那么可以使用这种发送方式 Future<RecordMetadata> future = producer.send(producerRecord); // send()方住先返回一个 Future对象,然后调用Future对象的get()方法等待Kafka响应 // 如果服务器返回错误,get()方法会抛出异 // 如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量 // 如果在发送数据之前或者在发送过程中发生了任何错误,比如broker返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常 RecordMetadata rm = null; try { // 4、调用一个Future.get()方法等待响应 rm = future.get(); System.out.println(rm.topic() + " 分区:" + rm.partition() + " 偏移量:" + rm.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } // 5、关闭生产者 producer.close(); } }
程序输出:
package com.example.kafka.demo2; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 消费者消费数据 * * @author tom */ public class KafkaConsumerUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) throws Exception { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 开启自动提交offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交offset的时间间隔 auto.commit.interval.ms properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // key,value的反序列化 key.deserializer、value.deserializer // key的反序列化,其类型是Integer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); // value的反序列化,其类型是Object properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties); // 3、订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME)); // 4、使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value Integer key = consumerRecord.key(); User value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } Thread.sleep(1000); } } }
程序输出:
消费topic已经存在的数据,类似命令中 --from-beginning
参数。
代码在该服务启动前,如果topic中存在数据,是可以全部读出来,但如果topic数据部分已经被消费了,也会被读
出来。
package com.example.kafka.demo3; import com.example.kafka.demo2.User; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Properties; /** * @author tom */ public class KafkaConsumerHistoryOfUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) throws Exception { // 1、创建消费者配置信息 Properties properties = new Properties(); // 连接的集群 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 开启自动提交offset enable.auto.commit properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交offset的时间间隔 auto.commit.interval.ms properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // key,value的反序列化 key.deserializer、value.deserializer // key的反序列化,其类型是Integer properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); // value的反序列化,其类型是Object properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka"); // 消费者组 group.id // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME); // auto.offset.reset // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效: // 1.上面设置的消费组还未消费(可以更改组名来消费) // 2.该offset已经过期 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 2、创建Kafka消费者 KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties); // 基于再均衡监听器,在给消费者分配分区的时候将消息偏移量跳转到起始位置 kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { // 读取历史数据 --from-beginning // 基于seekToBeginning方法 kafkaConsumer.seekToBeginning(collection); // Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(collection); // for (Map.Entry<TopicPartition, Long> entry : beginningOffset.entrySet()) { // 基于seek方法 // TopicPartition tp = entry.getKey(); // long offset = entry.getValue(); // consumer.seek(tp,offset); //} } }); while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value Integer key = consumerRecord.key(); User value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } Thread.sleep(1000); } } }
程序输出:
至此,结束。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。