赞
踩
传统消息队列的主要应用场景包括:
削峰:
解耦:
异步:
topic
分为多个partition
consumer
也分成了group
一个分区的数据, 只能由一个消费者消费Partition
提供副本Zookeeper
实现分布式管理由于每台机器要装一个Kafka
容器, 这里使用Kubernetes
的DaemonSet
来实现
apiVersion: apps/v1 kind: DaemonSet metadata: name: kafka labels: app: kafka spec: selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: containers: - name: kafka image: bitnami/kafka # env: # - name: JMX_PORT # value: "9010" ports: - name: client containerPort: 9092 hostPort: 20015 - name: jmx containerPort: 9010 hostPort: 20016 volumeMounts: - name: time mountPath: /etc/localtime - name: data mountPath: /bitnami/kafka volumes: - name: time hostPath: path: /etc/localtime type: File - name: data hostPath: path: /opt/docker/kafka/data type: Directory
以下是三个节点所需要修改的配置文件
# server.passnight.local
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://server.passnight.local:20015
log.dirs=/bitnami/kafka/logs
# replica.passnight.local
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://server.passnight.local:20015
log.dirs=/bitnami/kafka/logs
# follower.passnight.local
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://server.passnight.local:20015
log.dirs=/bitnami/kafka/logs
这里主要要注意以下几点:
Zookeeper
一样, listener
的ip
要置为0.0.0.0
; 不能置为节点ip
/127.0.0.1
或其他ip
broker.id
要全局唯一advertised.listeners
是外部访问地址, 应该置为外部ip
和容器暴露的端口jmx
, 打开后会报错端口已经被占用进入容器
passnight@passnight-s600:~$ docker exec -it kafka bash
Kafka
基本操作查看kafka-topics
帮助文档
I have no name!@b0d8083f09f5:/$ kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions and replica assignment. Update the configuration of an existing topic via --alter is no longer supported here (the kafka-configs CLI supports altering topic configs with a -- bootstrap-server option). --at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --config <String: name=value> A topic configuration override for the topic being created or altered. The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. It is supported only in combination with -- create if --bootstrap-server option is used (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option). --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option. --describe List details for the given topics. --exclude-internal exclude internal topics when running list or describe command. The internal topics will be listed by default --help Print usage information. --if-exists if set when altering or deleting or describing topics, the action will only execute if the topic exists. --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist. --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default. --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. If not supplied, defaults to the cluster default. --topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the '\' prefix to escape regular expression symbols; e. g. "test\.topic". --topic-id <String: topic-id> The topic-id to describe.This is used only with --bootstrap-server option for describing topics. --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-min-isr-partitions if set when describing topics, only show partitions whose isr count is less than the configured minimum. --under-replicated-partitions if set when describing topics, only show under replicated partitions --version Display Kafka version.
I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list # 查看有哪些主题 I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic fist --create --partitions 1 --replication-factor 1 # 创建主题"topic", 分区1, 副本1 Created topic fist. I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic fist --describe # 查看主题详细信息 Topic: fist TopicId: qZfeWB7mRl6iGWOJPql3mQ PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=10000000 Topic: fist Partition: 0 Leader: 1004 Replicas: 1004 Isr: 1004 I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic fist --alter --partitions 4 # 修改分区数为4 I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic fist --describe # 现在有4个人分区了 Topic: fist TopicId: qZfeWB7mRl6iGWOJPql3mQ PartitionCount: 4 ReplicationFactor: 1 Configs: max.message.bytes=10000000 Topic: fist Partition: 0 Leader: 1004 Replicas: 1004 Isr: 1004 Topic: fist Partition: 1 Leader: 1004 Replicas: 1004 Isr: 1004 Topic: fist Partition: 2 Leader: 1004 Replicas: 1004 Isr: 1004 Topic: fist Partition: 3 Leader: 1004 Replicas: 1004 Isr: 1004 I have no name!@b0d8083f09f5:/$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic fist --alter --partitions 2 # 分区只能增加, 不能减少 Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2. [2023-08-12 11:01:05,935] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2. (kafka.admin.TopicCommand$)
Kafka
生产者/消费者基本操作I have no name!@b0d8083f09f5:/$ kafka-console-producer.sh Missing required option(s) [bootstrap-server] Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. please note that this option will be replaced if max- partition-memory-bytes is also set (default: 16384) --bootstrap-server <String: server to REQUIRED unless --broker-list connect to> (deprecated) is specified. The server (s) to connect to. The broker list string in the form HOST1:PORT1,HOST2: PORT2. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. The broker list string in the form HOST1:PORT1, HOST2:PORT2. --compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'. If specified without value, then it defaults to 'gzip' --help Print usage information. --line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will send> block for during a send request. (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. This is the option to control `buffer.memory` in producer configs. (default: 33554432) --max-partition-memory-bytes <Integer: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. This is the option to control `batch.size` in producer configs. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. This is the option to control `retries` in producer configs. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. This is the option to control `metadata.max.age. ms` in producer configs. (default: 300000) --producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer. --producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config. --property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. Default properties include: parse.key=false parse.headers=false ignore.error=false key.separator=\t headers.delimiter=\t headers.separator=, headers.key.separator=: null.marker= When set, any fields (key, value and headers) equal to this will be replaced by null Default parsing pattern when: parse.headers=true and parse.key=true: "h1:v1,h2:v2...\tkey\tvalue" parse.key=true: "key\tvalue" parse.headers=true: "h1:v1,h2:v2...\tvalue" --reader-config <String: config file> Config properties file for the message reader. Note that [property] takes precedence over this config. --request-required-acks <String: The required `acks` of the producer request required acks> requests (default: -1) --request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero. (default: 1500) --retry-backoff-ms <Long> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. This is the option to control `retry.backoff.ms` in producer configs. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. This is the option to control `send.buffer. bytes` in producer configs. (default: 102400) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Long: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. This is the option to control `linger.ms` in producer configs. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce messages to. --version Display Kafka version.
I have no name!@b0d8083f09f5:/$ kafka-console-consumer.sh This tool helps to read data from Kafka topics and outputs it to standard output. Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --formatter-config <String: config Config properties file to initialize file> the message formatter. Note that [property] takes precedence over this config. --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <String: consumer group id> The consumer group id of the consumer. --help Print usage information. --include <String: Java regex (String)> Regular expression specifying list of topics to include for consumption. --isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default: read_uncommitted) --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --offset <String: consume offset> The offset to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified. --property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> header.deserializer=<header. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.', 'value. deserializer.' and 'headers. deserializer.' prefixes to configure their deserializers. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic to consume on. --value-deserializer <String: deserializer for values> --version Display Kafka version. --whitelist <String: Java regex DEPRECATED, use --include instead; (String)> ignored if --include specified. Regular expression specifying list of topics to include for consumption.
I have no name!@b0d8083f09f5:/$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first # 连接生产者Console
I have no name!@b0d8083f09f5:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first # 连接消费者
消费者就能看到生产者生产的消息了
I have no name!@b0d8083f09f5:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning # 查看所有历史数据
hello
hello
hello from producer
class AsyncProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i)), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } } }); } } } }
在控制台中打开Consumer
; 可以看到数据已经被接收到了
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-consumer.sh --bootstrap-server server.passnight.local:20015 --topic first
value 0
value 1
value 2
value 3
value 4
并且回调函数也成功执行, 数据被打印在控制台当中
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
class SyncProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 RecordMetadata metadata = kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i))).get(); System.out.printf("metadata: %s%n", metadata.toString()); } } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } }
可以看到消费者成功接收到数据
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-consumer.sh --bootstrap-server server.passnight.local:20015 --topic first
value 0
value 1
value 2
value 3
value 4
并且发送成功后返回的元数据也成功打印
metadata: first-0@45
metadata: first-0@46
metadata: first-0@47
metadata: first-0@48
metadata: first-0@49
分区的优点:
以下内容是从Kafka的DefaultPartiioner
拷贝过来的, 它解释了Kafka的分区算法:
- The default partitioning strategy:
- If a partition is specified in the record, use it
- If no partition is specified but a key is present choose a partition based on a hash of the key
- If no partition or key is present choose the sticky partition that changes when the batch is full.
- See KIP-480 for details about sticky partitioning.
因此, 在默认情况下:
key
之后, Kafka根据Key的Hash值进行分区下面例子中指定了特定分区:
class PartitionProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", 2, "", String.format("value %d", i)), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } } }); } } } }
Kafka将数据发送到了指定的分区
主题: first, 分区: 2
主题: first, 分区: 2
主题: first, 分区: 2
主题: first, 分区: 2
主题: first, 分区: 2
注意这之前需要在Kafka-Manager中添加分区
class CustomerPartitionProducer { public static class HelloPartitioner implements Partitioner { // 注意这个要生命为public类型 @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String msg = value.toString(); return msg.contains("hello") ? 0 : 1; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } } public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 使用自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, HelloPartitioner.class.getName()); String[] messages = new String[]{"hello message", "greeting", "hello", "no more"}; try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (String message : messages) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", String.format("%s", message)), (metadata, exception) -> { if (exception == null) { System.out.printf("主题: %s, 值: %s, 分区: %s%n", metadata.topic(), message, metadata.partition()); } }); } } } }
上述分区器将含有hello
的分区到0
, 不含的分区到1
; 打印结果如下:
主题: first, 值: greeting, 分区: 1
主题: first, 值: no more, 分区: 1
主题: first, 值: hello message, 分区: 0
主题: first, 值: hello, 分区: 0
影响Kafka主要有以下几个方面:
batch.size
: 批次大小, 影响每批的大小; 只有当批次缓冲满了之后, 才会发送消息; 增大会提高吞吐量但会也会提高延迟linger.ms
: 等待时间, 默认为0
毫秒, 若缓冲区没满但是达到了等待时间, 也会发送消息; 若调大则会等待批次中的消息积压, 增大单批次的数据量; 但是会增大延迟 需要同步提高批次大小compression.type
: 压缩, 若对数据进行压缩, 单次可以运送的数据增大RecordAccumulator
: 缓冲区大小, 也需要和批次大小相适配, 否则会导致传输不能满足等待时间和批次大小下面是一个设置上述参数的例子:
class ParameterProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 性能相关参数 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024); // 批次大小, 32M->64M properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); // 默认是16k, 这里保留默认值 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 等待时间, 0ms->1ms properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name); // 使用snappy压缩 try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i)), (metadata, exception) -> { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } }); } } } }
控制台打印出发送结果:
主题: first, 分区: 34
主题: first, 分区: 34
主题: first, 分区: 34
主题: first, 分区: 34
主题: first, 分区: 34
可以根据Kafka的ack
来判断数据是否成功落盘; 因此可以配置ack
应答级别来控制可靠性
ack=0
: 生产者不需要等待数据应答, 无任何可靠性保证ack=1
: 生产者发送数据, Leader收到数据后应答, 在下述情况中可能会导致数据丢失:
ack
认为数据已经落盘, 若数据没有同步到任何Follower, 数据便会丢失ack=-1
: 只有等待所有ISR队列中的节点(部分Follower)完成与Leader的同步后, Leader才会返回ACK, 因为若需要所有Follower同步可能存在下面问题:
ACK
, 这样会导致Leader一直等待in-sync replica set
(ISR)保存所有正常同步的节点; 若Follower长时间未向Leader发送心跳, Follower会被踢出ISR; 该时间阈值可以通过replica.lag.time.max.ms
配置ack=1
一样, 仍然有丢数的风险ack=0
几乎不适用ack=1
: 可以用于传输日志, 允许丢失个别数据ack=-1
: 用于传输部分重要数据以下是一个在Java代码中配置的例子
class ReliableProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 性能相关参数 properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 配置ack=1, 当Leader落盘后完成传输 properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数, Integer.MAX_VALUE -> 1 try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i)), (metadata, exception) -> { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } }); } } } }
数据传递也有以下几个等级:
enable.idempotence
来配置, 默认为true
即开启topic
多个partition
的原子写入 精确一次不保证多分区多主题的原子写入transaction.id
决定事务处于哪个分区, 再由对应分区的Leader节点提供事务协调器当没有开启事务的时候
class TransactionalProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 开启事务必须指定事务id // properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_01"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); try { // kafkaProducer.initTransactions(); // 初始化事务 // kafkaProducer.beginTransaction(); for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i))); if (i == 3) { int a = 1 / 0; // 触发异常, 放弃事务 } } // kafkaProducer.commitTransaction(); // 提交事务 } catch (Exception e) { // kafkaProducer.abortTransaction();// 异常则放弃事务 throw new RuntimeException(e); } finally { kafkaProducer.close(); } } }
消费者接受到了部分数据
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-consumer.sh --bootstrap-server replica.passnight.local:20015 --topic first
value 0
value 1
value 2
value 3
开启事务后
class TransactionalProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 开启事务必须指定事务id properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_01"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); try { kafkaProducer.initTransactions(); // 初始化事务 kafkaProducer.beginTransaction(); for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i))); if (i == 3) { int a = 1 / 0; // 触发异常, 放弃事务 } } kafkaProducer.commitTransaction(); // 提交事务 } catch (Exception e) { kafkaProducer.abortTransaction();// 异常则放弃事务 throw new RuntimeException(e); } finally { kafkaProducer.close(); } } }
消费者没有接收到任何数据
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-consumer.sh --bootstrap-server replica.passnight.local:20015 --topic first
单分区有序:
多分区有序
max.in.flightrequests.pre.connection
来开启同步重试来保证严格有序窗口: 将所有数据都拉下来后排序, 完成排序后再消费
设置max.in.flightrequests.pre.connection=1
: 只有当前消息应答后才能发送小一条 1.x版本之前
设置max.in.flightrequests.pre.connection
小于5, 并开启幂等性; 因为kafka集群会缓存最近5个请求 1.x版本之后
Kafka中的信息都存储在Zookeeper中, 因此可以在Zookeeper中查看Zookeeper信息; 我们可以使用PrettyZookeeper
查看Zookeeper中的信息; 其中主要要了解的节点有:
/kafka/brokers/ids
: 记录有哪些服务器
// /kafka/brokers/ids/0
{
"listener_security_protocol_map" : {
"PLAINTEXT" : "PLAINTEXT"
},
"endpoints" : [ "PLAINTEXT://server.passnight.local:20015" ],
"jmx_port" : -1,
"features" : { },
"host" : "server.passnight.local",
"timestamp" : "1694323239360",
"port" : 20015,
"version" : 5
}
/kafka/brokers/topics/first/partitions/0/state
: 记录谁是Leader, 哪些服务可用:
// topic: first, partition: 0对应的信息
{
"controller_epoch" : 11,
"leader" : 1,
"version" : 1,
"leader_epoch" : 0,
"isr" : [ 1 ]
}
/kafka/consumers
: 消费者信息
/kafka/controller
: 辅助leader选举的信息:
// /kafka/controller
{
"version" : 2,
"brokerid" : 0,
"timestamp" : "1694323244719",
"kraftControllerEpoch" : -1
}
replica.lag.time.max.ms
设置, 默认为30s; 当Leader故障后, 就会从ISR中选举新的Leadercontroller
节点, 优先写入者决定选举Topic是一个逻辑上的概念
Partition是一个物理上的概念, 每个Partition对应一个log文件
该log文件存储了Producer产生的数据, Producer产生的数据会不断被追加到log文件末端
为了防止log文件过大导致数据定位效率低下, Kafka采用了分片和索引机制, 每个Partition分为多个segment
每个segment包含
index
文件: 偏移量索引文件, 以前一个文件最大的索引命名; index是稀疏索引, 每往log文件写入4kb数据, 才会往index文件写入一条索引, 可以通过log.index.interval.bytes
设置log
文件: 日志文件timeindex
: 时间戳索引文件; segment只会保存一定时间, 超过后会被删除这些文件位于一个文件夹下, 该文件夹的命名规则为topic名称+分区序号
如first-0
该文件可以使用kafka-run-class.sh
来查看:
# 查看索引文件
passnight@passnight-s600:/opt/docker/kafka/data$ /usr/local/kafka/kafka_2.13-3.5.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./logs/first-11/00000000000000000000.index
Dumping ./logs/first-11/00000000000000000000.index
offset: 0 position: 0
# 查看日志文件
passnight@passnight-s600:/opt/docker/kafka/data$ /usr/local/kafka/kafka_2.13-3.5.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./logs/first-11/00000000000000000000.log
Dumping ./logs/first-11/00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1694349261255 size: 78 magic: 2 compresscodec: none crc: 2496089419 isvalid: true
kafka可以保存消息一段时间, 具体可以通过以下配置:
log.retention.hours
: 默认7天(默认值为168), 单位为小时, 优先级最低log.retention.minutes
: 分钟log.retention.ms
: 毫秒, 最高优先级log.retnetion.check.interval.ms
: 检查周期, 默认为5分钟log.cleanup.policy=delete
配置
log.retention.bytes
配置, 默认为-1
, 表示无穷大log.cleanup.policy=compact
打开:
offset=6
的信息, 最后因为offset=6
被压缩了, 只能拿offset=7
的数据# 创建一个新的topic: `second`; 由16个分区, 3个副本 passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --create --topic second --partitions 16 --replication-factor 3 Created topic second. # 使用describe命令查看所有副本, 可以看到副本/leader存在`1,0,2`的周期性; Kafka会均匀将将分区放在不同的服务器上 passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --describe --topic second Topic: second TopicId: 6oBdeLc9Qu2yoGnByrBLkg PartitionCount: 16 ReplicationFactor: 3 Configs: Topic: second Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: second Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: second Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: second Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: second Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: second Partition: 5 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: second Partition: 6 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: second Partition: 7 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: second Partition: 8 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: second Partition: 9 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: second Partition: 10 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: second Partition: 11 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: second Partition: 12 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: second Partition: 13 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: second Partition: 14 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: second Partition: 15 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
有的时候由于每台服务器性能不一致; 按照Kafka默认的分配方式并不是最优的, 因此需要手动调整分区副本
# 创建一个4分区, 2副本的主题
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --create --topic three --partitions 4 --replication-factor 2
Created topic three.
# 查看注意分布
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --describe --topic three
Topic: three TopicId: _E1mdzhSTWSOzsN7399tTg PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: three Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: three Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: three Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: three Partition: 3 Leader: 2 Replicas: 2,0 Isr: 2,0
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1$ sudo vim reset-replication-factor.json
输入以下内容
{
"version": 1,
"partitions": [
{ "topic": "three", "partition": 0, "replicas": [0, 1] },
{ "topic": "three", "partition": 1, "replicas": [0, 1] },
{ "topic": "three", "partition": 2, "replicas": [1, 0] },
{ "topic": "three", "partition": 3, "replicas": [1, 0] }
]
}
并执行
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1$ ./bin/kafka-reassign-partitions.sh --bootstrap-server replica.passnight.local:20015 --reassignment-json-file reset-replication-factor.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"three","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"three","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"three","partition":3,"replicas":[2,0],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for three-0,three-1,three-2,three-3 # 验证是否重分配成功, 发现已经完成重新分配 passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-reassign-partitions.sh --bootstrap-server replica.passnight.local:20015 --reassignment-json-file ../reset-replication-factor.json --verify Status of partition reassignment: Reassignment of partition three-0 is completed. Reassignment of partition three-1 is completed. Reassignment of partition three-2 is completed. Reassignment of partition three-3 is completed. Clearing broker-level throttles on brokers 0,1,2 Clearing topic-level throttles on topic three # 重新查看, 发现分区已经按照预定义的调整了 passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --describe --topic three Topic: three TopicId: _E1mdzhSTWSOzsN7399tTg PartitionCount: 4 ReplicationFactor: 2 Configs: Topic: three Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: three Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: three Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: three Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1
auto.leader.rebalance.enable
, 当broker不平衡的阈值达到这个数值时, 控制器会重新平衡leader 默认值为10%leaer.imbalance.check.interval.seconds
来配置检测leader负载的时间间隔 默认值为300# 创建主题4, 副本数为1
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --create --topic four --partitions 3 --replication-factor 1
Created topic four.
# 修改副本数为3(不能通过命令行参数修改, 只能通过josn重新分配)
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ sudo vim ../reset-replication-factor.json
修改json文件为:
{
"version": 1,
"partitions": [
{ "topic": "four", "partition": 0, "replicas": [0, 1, 2] },
{ "topic": "four", "partition": 1, "replicas": [0, 1, 2] },
{ "topic": "four", "partition": 2, "replicas": [0, 1, 2] }
]
}
执行并查看结果
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-reassign-partitions.sh --bootstrap-server replica.passnight.local:20015 --reassignment-json-file ../reset-replication-factor.json --verify
Status of partition reassignment:
There is no active reassignment of partition four-0, but replica set is 2 rather than 0,1,2.
There is no active reassignment of partition four-1, but replica set is 1 rather than 0,1,2.
There is no active reassignment of partition four-2, but replica set is 0 rather than 0,1,2.
Clearing broker-level throttles on brokers 0,1,2
Clearing topic-level throttles on topic four
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-topics.sh --bootstrap-server replica.passnight.local:20015 --describe --topic four
Topic: four TopicId: OJT1_NUOSv-AkGirZgl4RA PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: four Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: four Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: four Partition: 2 Leader: 0 Replicas: 0 Isr: 0
可以看到副本数已经成功改变
消费方式主要分为以下两种消费方式
groupid
相同
session.timeout.ms=45s
配置, 后者可以通过max.poll.interval.ms=5min
配置sendFetches
请求, 其中可以配置每批次最小抓取的大小Fetch.min.bytes=1
/一批数据最小值未达到的超时时间fetch.max.wait.ms=500
和每批次最大抓取大小fetch.max.bytes=50m
onSuccess
回调函数, 将数据放到一个队列当中; 消费者会将这些数据反序列化/通过拦截器并拉取消息再处理数据, 其中一次拉取数据返回消息的最大条数为max.poll.records=500
条背景: 一个first主题, 有三个分区, 都由一个consumer消费
package com.passnight.springboot.kafka.consumer; import lombok.Cleanup; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { } class CustomerConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 必须配置group id, 否则消费者无法正常启动 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // 创建一个消费者 @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅一个主题 consumer.subscribe(Collections.singleton("first")); // 消费数据 while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); } } }
在控制台中使用kafka-console-producer.sh
创建一个producer并发送数据
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-producer.sh --bootstrap-server server.passnight.local:20015 --topic first
>hello from producer
之后就能接收到producer发送的数据了
ConsumerRecord(topic = first, partition = 34, leaderEpoch = 0, offset = 6, CreateTime = 1694850051405, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello from producer)
消费者使用assign
函数绑定分区/主题, 然后拉取数据
class PartitionConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 必须配置group id, 否则消费者无法正常启动 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // 创建一个消费者 @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅一个主题的某个分区 consumer.assign(Collections.singleton(new TopicPartition("first", 0))); // 消费数据 while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); } } }
使用java代码往对应的分区发送数据
class PartitionProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 5; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", 0, "", String.format("value %d", i)), (metadata, exception) -> { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } }); } } } }
可以看到数据成功被发送和接收
# Consumer的日志
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 101, CreateTime = 1694850576525, serialized key size = 0, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = value 0)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 102, CreateTime = 1694850576536, serialized key size = 0, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = value 1)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 103, CreateTime = 1694850576536, serialized key size = 0, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = value 2)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 104, CreateTime = 1694850576537, serialized key size = 0, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = value 3)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 105, CreateTime = 1694850576537, serialized key size = 0, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = value 4)
# Producer的日志
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
主题: first, 分区: 0
目标: 创建一个由三个消费者组成的消费者组, 然后消费first
主题
使用以下java代码发送30条数据到不同的分区
class AsyncProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群(server.passnight.local:20015) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015"); // 配置Key/Value序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) { for (int i = 0; i < 30; i++) { // 发送数据 kafkaProducer.send(new ProducerRecord<>("first", String.format("value %d", i)), (metadata, exception) -> { if (exception == null) { System.out.printf("主题: %s, 分区: %s%n", metadata.topic(), metadata.partition()); } }); Thread.sleep(1); } } catch (InterruptedException e) { throw new RuntimeException(e); } } }
可以看到消息被发送到不同的分区中
主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 0 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 34 主题: first, 分区: 40 主题: first, 分区: 40 主题: first, 分区: 40 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 32 主题: first, 分区: 36
再使用以下代码接收消息
@Log4j2 class GroupedConsumer { static volatile boolean stopped = false; static Properties properties = new Properties(); static CountDownLatch initLatch = new CountDownLatch(3); static { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 必须配置group id, 否则消费者无法正常启动 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); } private static class ConsumerThread extends Thread { @SneakyThrows @Override public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅一个主题的某个分区 consumer.assign(Collections.singleton(new TopicPartition("first", 0))); initLatch.countDown(); initLatch.await(); // 消费数据 while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.printf("%s: %s%n", getName(), record); } } consumer.close(); } ConsumerThread(long n) { super(); setName(String.format("ConsumerThread-%d", n)); } } public static void main(String[] args) { IntStream.range(0, 3) .asLongStream() .mapToObj(ConsumerThread::new) .forEach(Thread::start); } }
之后便可以看到每个Consumer消费固定的Partition
ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 144, CreateTime = 1694852156029, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 0) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 145, CreateTime = 1694852156038, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 1) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 146, CreateTime = 1694852156040, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 2) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 147, CreateTime = 1694852156041, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 3) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 148, CreateTime = 1694852156042, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 4) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 149, CreateTime = 1694852156043, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 5) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 150, CreateTime = 1694852156044, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 6) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 151, CreateTime = 1694852156045, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 7) ConsumerThread-2: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 152, CreateTime = 1694852156047, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 8) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 144, CreateTime = 1694852156029, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 0) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 144, CreateTime = 1694852156029, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 0) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 145, CreateTime = 1694852156038, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 1) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 146, CreateTime = 1694852156040, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 2) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 147, CreateTime = 1694852156041, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 3) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 148, CreateTime = 1694852156042, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 4) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 149, CreateTime = 1694852156043, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 5) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 150, CreateTime = 1694852156044, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 6) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 151, CreateTime = 1694852156045, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 7) ConsumerThread-1: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 152, CreateTime = 1694852156047, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 8) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 145, CreateTime = 1694852156038, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 1) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 146, CreateTime = 1694852156040, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 2) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 147, CreateTime = 1694852156041, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 3) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 148, CreateTime = 1694852156042, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 4) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 149, CreateTime = 1694852156043, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 5) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 150, CreateTime = 1694852156044, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 6) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 151, CreateTime = 1694852156045, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 7) ConsumerThread-0: ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 152, CreateTime = 1694852156047, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 8)
Kafka可以配置消费的offset, 具体可以通过auto.offset.reset=realiest|latest|none
来配置, 默认值是latest
; 他们的具体功能分别如下:
--from-beginning
下面是一个从任意位置开始消费的例子:
class CustomerOffsetConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("first")); // 指定开始消费的offset Set<TopicPartition> assignment = consumer.assignment(); // 保证分区已经分配完毕; 若没有分配完毕可能会拿到空值 while (assignment.isEmpty()) { consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); } assignment.forEach(topicPartition -> consumer.seek(topicPartition, 150)); while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); } }
可以看到offset大于或等于150
的消息被再次消费
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 150, CreateTime = 1694852156044, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 6)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 151, CreateTime = 1694852156045, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 7)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 152, CreateTime = 1694852156047, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 8)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 153, CreateTime = 1694874743206, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello message)
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 2, offset = 154, CreateTime = 1694874743216, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
以下是示例代码
class CustomerTimeConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("first")); Set<TopicPartition> assignment = consumer.assignment(); while (assignment.isEmpty()) { consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); } // 获取 Map<TopicPartition, Long> topicPartitionLongMap = assignment.stream() .collect(Collectors.toMap( Function.identity(), topicPartition -> System.currentTimeMillis() - 1 * 24 * 3600 * 1000L )); Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionLongMap); assignment.forEach(topicPartition -> consumer.seek(topicPartition, topicPartitionOffsetAndTimestampMap.get(topicPartition).offset())); while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); } } }
控制台中打印了前一天开始的所有消息
ConsumerRecord(topic = first, partition = 35, leaderEpoch = 0, offset = 0, CreateTime = 1694851421184, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 342)
ConsumerRecord(topic = first, partition = 35, leaderEpoch = 0, offset = 1, CreateTime = 1694851421250, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 399)
ConsumerRecord(topic = first, partition = 35, leaderEpoch = 0, offset = 2, CreateTime = 1694851421252, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 401)
# ....................等等等.........................
partition.assignment.strategy
配置, 默认值是Range+CooperativeSticky
0.9
版版本之后存储在Kafka内置的topic中, 其Key为groupid+topic+分区号
, value为offset
值, 0.9
版本之前存储在Zookeeper中; 变化的原因是因为前者性能更好config/consumer.properties
中的exculde.internal.topics
配置为true
才能看到使用kafka-sonsole-consumer.sh
查看__consumer_offsets
主题, 里面包含了offset信息
passnight@passnight-s600:/usr/local/kafka/kafka_2.13-3.5.1/bin$ ./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server server.passnight.local:20015 --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
[group_test,first,33]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1694849715527, expireTimestamp=None)
[group_test,first,12]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1694849715527, expireTimestamp=None)
# ..............等等等...................
offset
的功能; 需要配置相关参数
enable.auto.commit=true
: 是否开启自动提交offset功能auto.commit.interval.ms=5000
: 自动提交offset的间隔在java代码中打开上述参数
class AutoOffsetConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // 开启自动提交offset的功能 true->true properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // true -> true properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 5000 -> 1000 @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("first")); while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); } } }
发送数据
主题: first, 分区: 36
主题: first, 分区: 36
主题: first, 分区: 36
主题: first, 分区: 36
主题: first, 分区: 36
可以看到消息可以正常消费
ConsumerRecord(topic = first, partition = 36, leaderEpoch = 0, offset = 27, CreateTime = 1694874800292, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 0)
ConsumerRecord(topic = first, partition = 36, leaderEpoch = 0, offset = 28, CreateTime = 1694874800300, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 1)
ConsumerRecord(topic = first, partition = 36, leaderEpoch = 0, offset = 29, CreateTime = 1694874800300, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 2)
ConsumerRecord(topic = first, partition = 36, leaderEpoch = 0, offset = 30, CreateTime = 1694874800300, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 3)
ConsumerRecord(topic = first, partition = 36, leaderEpoch = 0, offset = 31, CreateTime = 1694874800300, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = value 4)
并且可以在offset的消费端看到更新的offset
[group_test,first,1]::OffsetAndMetadata(offset=22, leaderEpoch=Optional[0], metadata=, commitTimestamp=1694874804756, expireTimestamp=None)
[group_test,first,15]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1694874804756, expireTimestamp=None)
下面代码是一个手动提交的例子, 具体输出和自动提交类似
class ManualOffsetConsumer { static boolean stopped = false; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server.passnight.local:20015,replica.passnight.local:20015,follower.passnight.local:20015"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // 关闭自动提交offset的功能 true -> false properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // true -> false @Cleanup KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("first")); while (!stopped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(System.out::println); // consumer.commitSync(); consumer.commitAsync(); } } }
Kafka的数据由于消费不及时而被删除
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。