赞
踩
Apache Kafka 是一个开源的分布式事件流平台,主要用于实时数据传输和流处理。它最初由 LinkedIn 开发,并在 2011 年成为 Apache 基金会的顶级项目。Kafka 设计的目标是处理大规模的数据流,同时提供高吞吐量、低延迟和高容错性
Kafka 的工作原理可以从几个关键方面来理解:
数据流示例
通过这些机制,Kafka 能够实现高吞吐量、低延迟和高可靠性的消息传递和数据流处理。
Kafka 依赖于 Java 运行环境,因此首先需要安装 Java 11 或更高版本
apt install -y openjdk-11-jdk
root@huhy:~# java --version
openjdk 11.0.23 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu1, mixed mode, sharing)
官网下载;https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
tar -xf kafka_2.13-3.7.1.tgz
cd kafka_2.13-3.7.1/
Kafka 的配置文件位于 config 目录中。主要的配置文件包括:
server.properties:Kafka 的服务器配置文件
zookeeper.properties:Zookeeper 的配置文件(如果使用 Zookeeper)
broker.id:描述:Kafka Broker 的唯一标识符。每个 Broker 必须有一个唯一的 broker.id。
默认值:无
示例:broker.id=0
listeners:描述:Kafka Broker 监听的地址和端口。指定了 Kafka 接收客户端请求的地址。
默认值:PLAINTEXT://:9092
示例:listeners=PLAINTEXT://localhost:9092
advertised.listeners:描述:Kafka 向客户端公开的地址。客户端会通过此地址与 Broker 进行通信。
默认值:无
示例:advertised.listeners=PLAINTEXT://your-hostname:9092
log.dirs:
描述:Kafka 存储日志文件的目录。可以设置多个目录,Kafka 会将数据分散存储。
默认值:/tmp/kafka-logs
示例:log.dirs=/var/lib/kafka/logs
log.retention.hours:
描述:日志文件的保留时间,单位是小时。超过这个时间的数据会被删除。
默认值:168(7 天)
示例:log.retention.hours=168
log.segment.bytes:
描述:每个日志段的大小,单位是字节。日志文件会被分段存储。
默认值:1073741824(1 GB)
示例:log.segment.bytes=536870912(512 MB)
num.partitions:
描述:默认创建的主题的分区数量。
默认值:1
示例:num.partitions=3
replication.factor:
描述:主题的副本因子,表示每个分区有多少副本。提高副本因子可以增加数据的可靠性。
默认值:无(主题创建时指定)
示例:replication.factor=2
message.max.bytes:
描述:Kafka 允许的最大消息大小,单位是字节。
默认值:1000012(1 MB)
示例:message.max.bytes=2097152(2 MB)
log.retention.bytes:
描述:每个分区的日志文件最大保留大小,超过这个大小的日志会被删除。
默认值:-1(不限制)
示例:log.retention.bytes=1073741824(1 GB)
log.cleaner.enable:
描述:启用日志清理器,用于压缩日志中的重复数据。
默认值:false
示例:log.cleaner.enable=true
security.inter.broker.protocol:
描述:Kafka Broker 之间的通信协议,支持 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
默认值:PLAINTEXT
示例:security.inter.broker.protocol=SSL
ssl.keystore.location:
描述:SSL 密钥库的位置,用于 SSL/TLS 加密。
默认值:无
示例:ssl.keystore.location=/path/to/keystore.jks
zookeeper.connect:
描述:Zookeeper 的连接字符串,包括 Zookeeper 的主机名和端口号。
默认值:localhost:2181
示例:zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms:
描述:Zookeeper 连接超时设置,单位是毫秒。
默认值:6000
示例:zookeeper.connection.timeout.ms=10000
auto.create.topics.enable:
描述:是否自动创建主题。如果设置为 true,当客户端向不存在的主题发送消息时,Kafka 会自动创建该主题。
默认值:true
示例:auto.create.topics.enable=false
delete.topic.enable:
描述:是否允许删除主题。如果设置为 true,可以通过 Kafka 提供的脚本删除主题。
默认值:false
示例:delete.topic.enable=true
通常情况下,默认配置就可以开始使用。如果需要自定义配置,可以编辑这些文件
启动 Zookeeper;Kafka 需要 Zookeeper 来管理集群的元数据。Kafka 附带了一个简单的 Zookeeper 实例,开启后另起一个终端
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka Broker;在另一个终端中,启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
另起一个终端3;Kafka 使用主题来组织消息。可以使用 Kafka 提供的脚本创建主题。例如,创建一个名为 test-topic 的主题:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic test-topic.
生产消息:可以使用 Kafka 提供的生产者工具向主题中发送消息。打开一个终端并运行,然后在终端4中输入消息并按回车键发送消息
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>huhy
>
消费消息;在另一个终端3中,可以运行消费者工具来读取消息,只有最开始两个终端是不能终端,后两个有交互界面可以直接用
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
huhy
获取信息如下
查看主题:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
查看主题详情:
bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
Topic: test-topic TopicId: rXHPQIqJRkO5lQDOsco3NQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
删除主题;
bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
验证查看
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
停止 Kafka Broker
bin/kafka-server-stop.sh
停止 Zookeeper:
bin/zookeeper-server-stop.sh
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。