赞
踩
Kafka标准软件基于Bitnami Kafka 构建。当前版本为3.6.1
你可以通过Qinghub Studio 在线体验 部署工具直接安装部署,也可以手动按如下文档操作
部署配置文件获取地址: https://gitee.com/qingplus/qingcloud-platform
可以使用以下环境变量通过 Bitnami Apache Kafka Docker 设置配置:
docker run --name kafka -e KAFKA_CFG_PROCESS_ROLES ... -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
或者通过修改docker-compose.yml此存储库中存在的文件:
kafka:
...
environment:
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
...
要在开发设置中使用 Apache Kafka,请创建以下docker-compose.yml文件:
version: "3"
services:
kafka:
image: 'registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
要部署它,请在文件所在目录中运行以下命令docker-compose.yml:
docker-compose up -d
Apache Kafka Raft (KRaft) 在 Kafka 中使用了新的仲裁控制器服务,该服务取代了之前的控制器,并使用了 Raft 共识协议的基于事件的变体。这极大地简化了 Kafka 的架构,将元数据整合到 Kafka 本身,而不是将其拆分到两个不同的系统:ZooKeeper 和 Kafka。
如果您想继续使用ZooKeeper,可以使用以下配置:
version: "3.9" services: zookeeper: image: registry.cn-hangzhou.aliyuncs.com/qingcloudtech/zookeeper:3.9 ports: - "2181:2181" volumes: - "zookeeper_data:/bitnami" environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:3.6.1 ports: - "9092:9092" volumes: - "kafka_data:/bitnami" environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 depends_on: - zookeeper volumes: zookeeper_data: driver: local kafka_data: driver: local
为了使用内部和外部客户端访问 Apache Kafka 代理,您需要为每个客户端配置一个侦听器。
为此,请将以下环境变量添加到 docker-compose 中:
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@<your_host>:9093
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
并暴露外部端口:
内部客户端仍然可以在docker网络内使用
ports:
- - '9092:9092'
+ - '9094:9094'
注意:要从外部进行连接,请将localhost上面的内容更改为主机的外网IP/主机名并包含EXTERNAL://0.0.0.0:9093在内KAFKA_CFG_LISTENERS以允许远程连接。
这些来自同一主机的客户端将用于localhost连接到 Apache Kafka。
kafka-console-producer.sh --producer.config /opt/bitnami/kafka/config/producer.properties --bootstrap-server 127.0.0.1:9094 --topic test
kafka-console-consumer.sh --consumer.config /opt/bitnami/kafka/config/consumer.properties --bootstrap-server 127.0.0.1:9094 --topic test --from-beginning
如果从另一台计算机运行这些命令,请相应地更改地址。
这些来自同一 Docker 网络上其他容器的客户端将使用 kafka 容器服务主机名连接到 Apache Kafka。
kafka-console-producer.sh --producer.config /opt/bitnami/kafka/config/producer.properties --bootstrap-server kafka:9092 --topic test
kafka-console-consumer.sh --consumer.config /opt/bitnami/kafka/config/consumer.properties --bootstrap-server kafka:9092 --topic test --from-beginning
同样,应用程序代码需要使用bootstrap.servers=kafka:9092
为了配置身份验证,您必须正确配置 Apache Kafka 侦听器。让我们看一个示例,为 Apache Kafka 配置SASL_SSL与客户端通信的身份验证以及SASL与控制器相关的通信的身份验证。
定义以下环境变量,以及用于客户端通信的 SASL 凭据:
KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9092
KAFKA_CLIENT_USERS=user
KAFKA_CLIENT_PASSWORDS=password
KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
KAFKA_CONTROLLER_USER=controller_user
KAFKA_CONTROLLER_PASSWORD=controller_password
必须使用自己的 SSL 证书。您可以将 Java 密钥存储或 PEM 文件放入/opt/bitnami/kafka/config/certs. 如果 JKS 或 PEM 证书受密码保护(推荐),您将需要提供它才能访问密钥库:
KAFKA_CERTIFICATE_PASSWORD=myCertificatePassword
如果相关证书安装在/opt/bitnami/kafka/config/certs/kafka.truststore.jks、/opt/bitnami/kafka/config/certs/kafka.truststore.pem,/bitnami/kafka/config/certs/kafka.truststore.jks,/bitnami/kafka/config/certs/kafka.truststore.pem,请设置该KAFKA_TLS_TRUSTSTORE_FILE变量。
以下脚本可以帮助您创建 JKS 和证书:
请记住以下注意事项:
当提示输入密码时,请为所有节点使用相同的密码。
version: '2' services: kafka: image: 'registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest' hostname: kafka.example.com ports: - '9092' environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://:9092 - KAFKA_CLIENT_USERS=user - KAFKA_CLIENT_PASSWORDS=password - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN - KAFKA_CONTROLLER_USER=controller_user - KAFKA_CONTROLLER_PASSWORD=controller_password - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - KAFKA_INTER_BROKER_USER=controller_user - KAFKA_INTER_BROKER_PASSWORD=controller_password - KAFKA_CERTIFICATE_PASSWORD=certificatePassword123 - KAFKA_TLS_TYPE=JKS # or PEM volumes: # Both .jks and .pem files are supported # - './kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro' # - './kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro' # - './kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro' - './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro' - './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'
为了获得使用和生成消息所需的凭据,您需要在客户端中提供凭据。如果您的 Apache Kafka 客户端允许,请使用您提供的凭据。
在使用bitnami/kafka image生成和使用消息时,您需要指向consumer.properties和/或producer.properties文件,其中包含工作所需的配置。您可以在该目录中找到该文件/opt/bitnami/kafka/config。
生成消息:
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
消费消息:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties
使用其他工具来使用 Apache Kafka 集群,则需要提供所需的信息。您可以在位于目录的文件中找到所需的信息/opt/bitnami/kafka/config。
当部署具有多个代理的 Apache Kafka 集群时,可以使用以下变量SASL配置代理间通信:SASL_SSL
在 KRaft 模式下部署具有多个控制器的 Apache Kafka 集群时,可以使用以下变量SASL配置控制器通信:SASL_SSL
配置 Apache Kafka 侦听器SASL或SASL_SSL与客户端通信时,您可以使用以下环境变量提供 SASL 凭据:
可以通过提供以下值来启用 KRaft 模式:
KAFKA_CFG_PROCESS_ROLES:以逗号分隔的 Kafka KRaft 角色列表。允许值:‘controller,broker’, ‘controller’, ’broker‘.
KAFKA_CFG_NODE_ID:Kafka节点的唯一id。
KAFKA_CFG_LISTENERS:Kafka 监听器列表。如果节点设置了角色包含controller,则必须包含CONTROLLER监听器。
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:使用 Apache Kafka 安全协议映射每个侦听器。如果节点设置了controller角色,则需要此设置环境变量. 例如:PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT。
为了在没有身份验证的情况下配置控制器通信,应该提供以下环境变量:
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 应包括CONTROLLER:PLAINTEXT.
为了配置 Apache Kafka 控制器通信带SASL,应该提供以下环境变量:
为了配置 Apache Kafka 控制器通信带SSL,应该提供以下环境变量:
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 应包括CONTROLLER:SSL.
KAFKA_TLS_<uppercase_controller_listener_name>_CLIENT_AUTH:配置 kafka 控制平面通信的 mTLS 身份验证方法。允许值:required, requested, none.
KAFKA_TLS_TYPE:选择要使用的 TLS 证书格式。允许值:JKS, PEM. 默认值:JKS。
有效的密钥库和信任库安装在/opt/bitnami/kafka/config/certs/kafka.keystore.jks和 处/opt/bitnami/kafka/config/certs/kafka.truststore.jks。
为了使用 Zookeeper 服务器对 Apache Kafka 进行身份验证SASL_SSL,应该提供以下环境变量:
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 应包括CONTROLLER:SASL_SSL.
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL:用于控制器通信的 SASL 机制。注意:KRaft 模式尚不支持 SCRAM 机制,因此 KRaft 模式中唯一支持的 SASL 机制是PLAIN.
KAFKA_CONTROLLER_USER:Apache Kafka 控制器通信用户。
KAFKA_CONTROLLER_PASSWORD:Apache Kafka控制器通信密码。
KAFKA_TLS_<uppercase_controller_listener_name>_CLIENT_AUTH:配置 kafka 控制平面通信的 mTLS 身份验证方法。允许值:required, requested, none.
KAFKA_TLS_TYPE:选择要使用的 TLS 证书格式。允许值:JKS, PEM. 默认值:JKS。
有效的密钥库和信任库安装在/opt/bitnami/kafka/config/certs/kafka.keystore.jks和 处/opt/bitnami/kafka/config/certs/kafka.truststore.jks。
SSL注意:SSL 设置由使用或协议配置的所有侦听器共享SASL_SSL。尚不支持为每个侦听器设置不同的证书。
可以通过提供以下值来启用 Zookeeper 模式:
为了无需身份验证即可连接 Zookeeper 服务器,您应该提供以下环境变量:
为了使用 Zookeeper 服务器对 Apache Kafka 进行身份验证SASL,您应该提供以下环境变量:
为了使用 Zookeeper 服务器对 Apache Kafka 进行身份验证SSL,您应该提供以下环境变量:
为了使用 Zookeeper 服务器对 Apache Kafka 进行身份验证SASL_SSL,您应该提供以下环境变量:
可以使用以下环境变量通过设置 Apache Kafka 集群:
第一步是创建一个 Apache Kafka 实例。
docker run --name kafka-0 \
--network app-tier \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 \
-e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv \
-p :9092 \
-p :9093 \
registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
接下来我们启动一个新的 Apache Kafka 容器。
docker run --name kafka-1 \
--network app-tier \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 \
-e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv \
-p :9092 \
-p :9093 \
registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
接下来我们启动另一个新的 Apache Kafka 容器。
docker run --name kafka-3 \
--network app-tier \
-e KAFKA_CFG_NODE_ID=3 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 \
-e KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv \
-p :9092 \
-p :9093 \
registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
一个 Apache Kafka 集群配置完成。可以通过添加/删除从属服务器来扩展集群,而不会导致任何停机。
可以在文件中找到此部署的 docker-compose 版本docker-compose-cluster.yml。
可以使用以下命令创建:
root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic mytopic --partitions 3 --replication-factor 3
Created topic "mytopic".
root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic mytopic
Topic:mytopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: mytopic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: mytopic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: mytopic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
该镜像中所有配置可以在/bitnami/kafka/config/中查找,可以通过设置 KAFKA_MOUNTED_CONF_DIR 环境变量来更改该位置。
docker run --name kafka -v /path/to/server.properties:/bitnami/kafka/config/server.properties registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
之后,更改将被同步到容器中。
运行 Apache Kafka 映像,从主机安装目录。
修改docker-compose.yml文件:
...
services:
kafka:
...
volumes:
- 'kafka_data:/bitnami'
+ - /path/to/server.properties:/bitnami/kafka/config/server.properties
使用您喜欢的编辑器编辑主机上的配置。
vi /path/to/server.properties
第3步:重启Apache Kafka
更改配置后,重新启动 Apache Kafka 容器以使更改生效。
docker restart kafka
或者使用 Docker Compose:
docker-compose restart kafka
日志
容器日志发送到stdout. 要查看日志:
docker logs kafka
或者使用 Docker Compose:
docker-compose logs kafka
要备份数据、配置和日志,请按照以下简单步骤操作:
docker stop kafka
或者使用 Docker Compose:
docker-compose stop kafka
我们需要在用于创建备份的容器中安装两个卷:主机上用于存储备份的目录,以及我们刚刚停止的容器中的卷,以便我们可以访问数据。
docker run --rm -v /path/to/kafka-backups:/backups --volumes-from kafka busybox \
cp -a /bitnami/kafka /backups/latest
或者使用 Docker Compose:
docker run --rm -v /path/to/kafka-backups:/backups --volumes-from `docker-compose ps -q kafka` busybox \
cp -a /bitnami/kafka /backups/latest
恢复备份就像将备份作为卷安装在容器中一样简单。
docker run -v /path/to/kafka-backups/latest:/bitnami/kafka registry.cn-hangzhou.aliyuncs.com/qingcloudtech/kafka:latest
修改docker-compose.yml文件:
kafka:
volumes:
- /path/to/kafka-backups/latest:/bitnami/kafka
本节介绍 Kafka 从 Zookeeper 模式到 KRaft 模式的迁移。
1.从 Zookeeper 获取cluster ID
2.配置controller节点,添加以下环境变量
KAFKA_CFG_PROCESS_ROLES=controller
KAFKA_CFG_NODE_ID=<unique_id>
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=<controller1_node_id>@<controller1_host>:9093,<controller2_node_id>@<controller2_host>:9093,...
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_LISTENERS=CONTROLLER://:9093
KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE=true
KAFKA_CFG_ZOOKEEPER_CONNECT=<zk_host>:<zk_port>
KAFKA_KRAFT_CLUSTER_ID=<cluster_id_step1>
3.使用迁移设置配置代理:
KAFKA_CFG_BROKER_ID=<current_broker_id>
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=<controller1_node_id>@<controller1_host>:9093,<controller2_node_id>@<controller2_host>:9093,...
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_INTER_BROKER_PROTOCOL_VERSION=3.4
KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE=true
KAFKA_CFG_ZOOKEEPER_CONNECT=<zk_host>:<zk_port>
4.迁移代理:
KAFKA_CFG_PROCESS_ROLES=broker
KAFKA_CFG_NODE_ID=<unique_id>
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=<controller1_node_id>@<controller1_host>:9093,<controller2_node_id>@<controller2_host>:9093,...
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
5.禁用控制器上的迁移模式:
KAFKA_CFG_PROCESS_ROLES=controller
KAFKA_CFG_NODE_ID=<unique_id>
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=<controller1_node_id>@<controller1_host>:9093,<controller2_node_id>@<controller2_host>:9093,...
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_LISTENERS=CONTROLLER://:9093
KAFKA_KRAFT_CLUSTER_ID=<cluster_id_step1>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。