赞
踩
1.kafka版本 3.6.1
2.docker 拉取kafka镜像
2.1启动docker,打开cmd输入以下命令,查看docker界面,已经拉取了镜像
docker pull bitnami/kafka:3.6.1
3.设置网络
3.1 在cmd中输入以下命令,这将使用指定的IP地址范围和网关地址创建一个名为netkafka的自定义网络。这个网络将在后续的Kafka Broker容器配置中使用。
docker network create --subnet=172.23.0.0/25 --gateway=172.23.0.1 netkafka
4.运行kafka.yml文件
4.1 在cmd中进入该文件所在目录,执行以下命令(文件内容附文末,创建应该.txt文件,修改后缀为.yml即可)
docker-compose -f kafka.yml up -d
4.2 查看docker界面,kafka集群已经启动
5.集群消费者生产者测试
5.1 打开一个cmd,执行以下命令,进入kafka1容器
docker exec -it kafka1 bash
5.2 执行以下命令,进入目录
cd /opt/bitnami/kafka/bin
5.3 执行以下命令,创建topic,创建一个副本为3、分区为5的topic
./kafka-topics.sh --create --topic foo --partitions 5 --replication-factor 3 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
5.4 执行以下命令,查看topic详细信息
kafka-topics.sh --describe --topic foo --bootstrap-server kafka1:9092, kafka2:9092, kafka3:9092
5.5 新打开一个cmd界面,执行以下命令进入kafak1容器
docker exec -it kafka1 bash
5.6 执行以下命令创建生产者
kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
5.7 新打开一个cmd界面,执行以下命令,进入kafka2容器
docker exec -it kafka2 bash
5.8 执行以下命令创建消费者
kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
在生产者中输入消息
查看消费者是否收到消息,可以看见消费者中成功接收到消息
6.以下为kafka.yml文件内容
其中,需要修改的内容为
1.修改宿主机ip KAFKA_CFG_ADVERTISED_LISTENERS(自己的主机IP)
2.修改挂载路径
version: "3.6"
services:
kafka1:
container_name: kafka1
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '19092:9092'
- '19093:9093'
environment:
# 允许使用Kraft
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
#定义外网访问地址(宿主机ip地址和端口,标红处修改为自己主机IP)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:19092
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker01:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.11
kafka2:
container_name: kafka2
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '29092:9092'
- '29093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
#标红处修改为自己主机IP
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:29092
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI #哪一,三个节点保持一致
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker02:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.12
kafka3:
container_name: kafka3
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '39092:9092'
- '39093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 标红处修改为自己主机IP
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:39092
- KAFKA_CFG_NODE_ID=3
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker03:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.13
networks:
name:
netkafka:
external: true
driver: bridge
name: netkafka
ipam:
driver: default
config:
- subnet: 172.23.0.0/25
gateway: 172.23.0.1
代码解释:
1. version: "3.6"`:指定了 Docker Compose 文件的版本。
3. `kafka1:`、`kafka2:` 和 `kafka3:`:这些是要创建的 Kafka 服务的配置部分。每个服务都包括了容器的名称、镜像、端口映射、环境变量、卷挂载、网络配置等信息。
4. `container_name`: 指定了容器的名称。
5. `image`: 指定了要使用的 Docker 镜像。
6. `ports`: 指定了要映射的端口,格式为 `<host_port>:<container_port>`。
7. `environment`: 指定了环境变量,用于配置 Kafka 实例。
KAFKA_ENABLE_KRAFT=yes:启用 Kafka 中的 Kafka Raft 强一致性协议 (KRaft)。
8.KAFKA_CFG_PROCESS_ROLES=broker,controller:指定 Kafka 进程的角色,其中包括 “broker” 和 “controller”。
9.KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER:指定控制器监听器的名称为 “CONTROLLER”。
10.KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093:定义 Kafka 监听器的配置。这里使用了两个监听器,一个用于 “PLAINTEXT” 协议,监听 9092 端口,另一个用于 “CONTROLLER” 协议,监听 9093 端口。
11.KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT:定义监听器的安全协议映射。这里将 “CONTROLLER” 协议映射到 “PLAINTEXT” 协议。
12.KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.135:29092:定义 Kafka 在外部可访问的监听器地址。这里使用了 “PLAINTEXT” 协议和宿主机的 IP 地址。
13.KAFKA_CFG_NODE_ID=1:指定当前 Kafka 节点的节点 ID。
14.KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI:指定 Kafka KRaft 集群的 ID。确保三个节点的集群 ID 相同。
15.KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093:指定 KRaft 控制器的选举候选人列表。这里列出了三个节点的 IP 地址和端口。
16.ALLOW_PLAINTEXT_LISTENER=yes:允许使用 “PLAINTEXT” 协议的监听器。
18. `volumes`: 定义了数据卷的挂载方式,用于持久化存储 Kafka 的数据。
19. `networks`: 指定了要连接的网络。
20. `networks -> netkafka`: 定义了名为 `netkafka` 的网络配置,其中包括了 IP 地址分配等信息。
解释:关于网络配置部分,`netkafka` 是一个外部网络,其 IP 地址由 IP 地址管理 (IPAM) 驱动程序提供。它使用了子网 `172.23.0.0/25`,并配置了网关地址为 `172.23.0.1`。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。