当前位置:   article > 正文

docker 部署kafka

docker 部署kafka

随笔记录

目录

1. 安装zookeeper

2. 安装Kafka

2.1 拉取kafka image

2.2 查询本地docker images

2.3 查看本地 容器(docker container)

2.3.1 查看本地已启动的 docker container

2.3.2 查看所有容器的列表,包括已停止的容器。

2.3.3 停止的启动的某个容器

2.3.4 启动某个容器 

2.4 删除指定容器

2.5 启动kafka 镜像

2.5.0 挂在自定义配置文件

2.5.1 启动kafaka container

2.5.2 验证kafka 容器已启动 

2.6 创建测试主题

2.6.1 进入kafka容器

2.6.2 创建topic

2.6.3 查询已创建的topic

2.6.4 在创建的主题中生产消息 

2.6.5 kafka 消费者消费消息

3  安装完kafka 容器后,修改docker 中kafka 容器配置文件

3.1 进入kafka 容器

3.2 修改配置文件

3.2.1 安装 vim

3.2.2 修改配置文件

3.3 重启重建容器  

3.3.1 停止运行的kafka 容器

3.3.2  删除kafka 容器

3.3.3  删除Kafka数据目录

3.3.4 重建kafk 容器 

4. 安装完kafka 容器后,设置容器与主机时间保持一致 

5. 远程连接kafka 容器失败原因定位

5.1  删除zookeeper 容器

5.2  删除kafka 容器

5.3  重新创建 zookeeper 容器

5.4 创建 kafka 容器

5.5 创建 topic

5.6 远程连接kafka 容器


1. 安装zookeeper

docker会自动拉取对应镜像

  1. # docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
  2. [root@localhost Docker-Compose-Master]# mkdir zookeeper
  3. [root@localhost Docker-Compose-Master]# ls
  4. docker-compose.yml kafka zookeeper
  5. [root@localhost Docker-Compose-Master]# cd zookeeper/
  6. [root@localhost zookeeper]# ls
  7. [root@localhost zookeeper]#
  8. [root@localhost zookeeper]# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
  9. Unable to find image 'wurstmeister/zookeeper:latest' locally
  10. latest: Pulling from wurstmeister/zookeeper
  11. a3ed95caeb02: Pull complete
  12. ef38b711a50f: Pull complete
  13. e057c74597c7: Pull complete
  14. 666c214f6385: Pull complete
  15. c3d6a96f1ffc: Pull complete
  16. 3fe26a83e0ca: Pull complete
  17. 3d3a7dd3a3b1: Pull complete
  18. f8cc938abe5f: Pull complete
  19. 9978b75f7a58: Pull complete
  20. 4d4dbcc8f8cc: Pull complete
  21. 8b130a9baa49: Pull complete
  22. 6b9611650a73: Pull complete
  23. 5df5aac51927: Pull complete
  24. 76eea4448d9b: Pull complete
  25. 8b66990876c6: Pull complete
  26. f0dd38204b6f: Pull complete
  27. Digest: sha256:7a7fd44a72104bfbd24a77844bad5fabc86485b036f988ea927d1780782a6680
  28. Status: Downloaded newer image for wurstmeister/zookeeper:latest
  29. 8dbbc5f4768e37b6049e7830e2c233476b629bdf3bafdf2eef9b0d2eb127b6c2
  30. [root@localhost zookeeper]#
  31. ======================================================================================
  32. # 以上命令参数解释说明:
  33. docker run:用于创建并启动一个新的容器
  34. --name zookeeper:指定容器的名称为 "zookeeper"
  35. -p 2181:2181:将主机的端口 2181 映射到容器的端口 2181,允许从主机上的其他应用程序访问 ZooKeeper 服务
  36. -v /etc/localtime:/etc/localtime:将主机的系统时间配置文件挂载到容器内,以便容器内的时间与主机保持同步
  37. -e TZ=Asia/Shanghai: 设置容器系统时区
  38. wurstmeister/zookeeper:指定要使用的容器镜像
  39. 总结:
  40. 执行该命令后,Docker 将使用 wurstmeister/zookeeper 镜像创建一个名为 "zookeeper" 的容器。ZooKeeper 是一个开源的分布式协调服务,该容器提供了运行 ZooKeeper 服务器所需的环境。
  41. 通过将主机的端口 2181 映射到容器的端口 2181,可以轻松地访问在容器中运行的 ZooKeeper 服务。使用 -v 参数将主机的系统时间配置文件挂载到容器内,可以确保容器的时间与主机保持一致。
  42. 这条命令执行后,ZooKeeper 容器将在后台运行,并且您可以使用 docker ps 命令来查看正在运行的容器。

2. 安装Kafka

2.1 拉取kafka image
  1. # 拉取kafka 镜像
  2. # docker pull wuristmeister/kafka
  3. [root@localhost kafka]# pwd
  4. /home/magx/Docker-Compose-Master/kafka
  5. [root@localhost kafka]#
  6. [root@localhost kafka]# ll
  7. total 8
  8. -rw-r--r--. 1 root root 3112 Dec 4 17:48 docker-compose-kafka.yml
  9. drwxr-xr-x. 5 root root 4096 Dec 4 16:40 kafka-docker
  10. [root@localhost kafka]#
  11. [root@localhost kafka]# docker pull wurstmeister/kafka
  12. Using default tag: latest
  13. latest: Pulling from wurstmeister/kafka
  14. 42c077c10790: Pull complete
  15. 44b062e78fd7: Pull complete
  16. b3ba9647f279: Pull complete
  17. 10c9a58bd495: Pull complete
  18. ed9bd501c190: Pull complete
  19. 03346d650161: Pull complete
  20. 539ec416bc55: Pull complete
  21. Digest: sha256:2d4bbf9cc83d9854d36582987da5f939fb9255fb128d18e3cf2c6ad825a32751
  22. Status: Downloaded newer image for wurstmeister/kafka:latest
  23. docker.io/wurstmeister/kafka:latest
  24. [root@localhost kafka]#
  25. ======================================================================================
  26. # 以上命令参数解释说明:
  27. docker pull:用于从 Docker 镜像仓库中拉取(下载)一个镜像
  28. wurstmeister/kafka:要拉取的镜像的名称
  29. 总结:
  30. 执行该命令后,Docker 将尝试从 Docker 镜像仓库中下载名为 wurstmeister/kafka 的镜像。这个镜像是由 wurstmeister 团队维护的 Kafka 镜像,Kafka 是一个流行的分布式流处理平台。
  31. 注意:
  32. 执行该命令需要在网络环境良好的情况下,并且 Docker 需要与 Docker 镜像仓库建立连接。下载完成后,可以使用 docker images 命令来查看已下载的镜像列表,确认 wurstmeister/kafka 镜像已成功下载
2.2 查询本地docker images
  1. # 查询本地docker 镜像文件
  2. # docker images
  3. [root@localhost kafka]# docker images
  4. REPOSITORY TAG IMAGE ID CREATED SIZE
  5. hello-world latest 9c7a54a9a43c 7 months ago 13.3kB
  6. wurstmeister/kafka latest a692873757c0 19 months ago 468MB
  7. wurstmeister/zookeeper latest 3f43f72cb283 4 years ago 510MB
  8. [root@localhost kafka]#
  9. [root@localhost kafka]#
2.3 查看本地 容器(docker container)
2.3.1 查看本地已启动的 docker container
  1. # 查询本地已启动docker 容器
  2. # docker ps
  3. [root@localhost kafka]#
  4. [root@localhost kafka]# docker ps
  5. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  6. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  7. [root@localhost kafka]#
2.3.2 查看所有容器的列表,包括已停止的容器。
  1. # 查看本地所有 docker container
  2. # docker ps -a 命令来查看所有容器的列表,包括已停止的容器。
  3. 它会显示容器的 ID、状态、创建时间等信息。
  4. [root@localhost kafka]# docker ps -a
  5. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  6. 913b2a1d7f07 wurstmeister/kafka "start-kafka.sh" 11 minutes ago Exited (143) 8 minutes ago kafka
  7. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  8. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  9. [root@localhost kafka]#
2.3.3 停止的启动的某个容器
  1. # 停止某个容器
  2. # docker stop <container_id>
  3. [root@localhost ~]# docker ps # 停止前,查询已启动容器list
  4. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  5. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 29 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  6. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  7. [root@localhost ~]#
  8. [root@localhost ~]# docker stop b03ba55d79cb # 停止kafka 容器
  9. b03ba55d79cb
  10. [root@localhost ~]#
  11. # 停止后,查询已启动容器list
  12. [root@localhost ~]#
  13. [root@localhost ~]# docker ps
  14. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  15. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  16. [root@localhost ~]#
  17. # 停止后,查询所有容器list
  18. [root@localhost ~]# docker ps -a
  19. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  20. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Exited (143) 8 seconds ago kafka
  21. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  22. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  23. [root@localhost ~]#
2.3.4 启动某个容器 
  1. # 启动某个容器
  2. # docker start <container_id or container_name>
  3. [root@localhost ~]# docker start kafka # 以container_name 启动容器
  4. kafka
  5. [root@localhost ~]#
  6. [root@localhost ~]# docker ps
  7. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  8. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  9. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  10. [root@localhost ~]#
2.4 删除指定容器
  1. # 要删除某个 Docker 容器,您可以使用 docker rm 命令,并提供要删除的容器的标识符或名称作为参数
  2. # docker rm <CONTAINER_ID> # 容器标识符: container_ID
  3. or
  4. # docker rm <CONTAINER_NAME> #容器名称: container_name
  5. # 删除前查询本地所有docker 容器
  6. [root@localhost kafka]# docker ps -a
  7. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  8. 913b2a1d7f07 wurstmeister/kafka "start-kafka.sh" 19 minutes ago Exited (143) 16 minutes ago kafka
  9. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  10. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  11. [root@localhost kafka]#
  12. [root@localhost kafka]#
  13. # 删除指定docker 容器
  14. [root@localhost kafka]#
  15. [root@localhost kafka]# docker rm 913b2a1d7f07 # docker rm <container_ID>
  16. 913b2a1d7f07
  17. [root@localhost kafka]#
  18. # 删除容器后,再次查询本地所有容器,不再显示已删除的容器
  19. [root@localhost kafka]# docker ps -a
  20. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  21. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  22. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  23. [root@localhost kafka]#

2.5 启动kafka 镜像
2.5.0 挂在自定义配置文件
  1. # 为避免后期修改kafka 容器配置文件,直接挂在配置文件
  2. 1)本地创建同名 配置文件 consumer.properties
  3. 2)修改配置文件中 bootstrap.server=<容器所在主机IP:9092>
  4. 3) 执行 docker run -v <本地同名配置文件路径><容器挂在配置文件目录>
  5. [root@localhost kafka]# pwd
  6. /home/magx/Docker-Compose-Master/kafka
  7. [root@localhost kafka]#
  8. [root@localhost kafka]# ll
  9. total 12
  10. -rw-r--r--. 1 root root 1224 Dec 25 14:40 consumer.properties # 本地同名配置文件
  11. -rw-r--r--. 1 root root 3112 Dec 4 17:48 docker-compose-kafka.yml
  12. drwxr-xr-x. 5 root root 4096 Dec 4 16:40 kafka-docker
  13. [root@localhost kafka]#
  14. [root@localhost kafka]#
2.5.1 启动kafaka container
  1. #启动kakfa 容器
  2. [root@localhost kafka]#
  3. [root@localhost kafka]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
  4. caac221a5a61b11a524f4c2601e02f300cb7229b69f4667d600c3827443be312
  5. [root@localhost kafka]#
  6. [root@localhost kafka]#
  7. 或者:
  8. docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
  9. ======================================================================================
  10. 通过以上命令,我们链接了ZooKeeper容器,并且设置了几个环境变量来配置Kafka。
  11. 在这个命令中:
  12. --name kafka: 设置容器的名字为“kafka”。
  13. -v /etc/localtime:/etc/localtime:ro 文件挂载到容器内的相应位置。这将使容器内部的时间与主机系统时间保持一致;
  14. ro选项将/etc/localtime文件挂载为只读模式,以防止容器内部意外修改主机系统时间
  15. -v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties
  16. 本地同名文件所在目录 : 容器中挂在目录
  17. -p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。
  18. -e TZ=Asia/Shanghai: 设置容器系统时区
  19. --link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。
  20. --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。
  21. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。
  22. --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。
  23. --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。
  24. wurstmeister/kafka: 使用的Docker镜像名字
  25. 确保在运行这个命令之前ZooKeeper容器已经在运行,并且可以通过zookeeper:2181来访问。
  26. 如果你的ZooKeeper容器有一个不同的名字或者你使用的是不同的网络设置,需要相应地调整--link和KAFKA_ZOOKEEPER_CONNECT的值
2.5.2 验证kafka 容器已启动 
  1. # docker ps # 查询 已启动 docker container
  2. #docker ps -a # 查询 所有 docker container
  3. [root@localhost kafka]# docker ps
  4. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  5. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 27 seconds ago Up 26 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  6. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  7. [root@localhost kafka]#
  8. [root@localhost kafka]#
  9. [root@localhost kafka]# docker ps -a # 查询所有 docker container
  10. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  11. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 41 seconds ago Up 40 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  12. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  13. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  14. [root@localhost kafka]#
2.6 创建测试主题
2.6.1 进入kafka容器
  1. # 进入kafka容器
  2. # docker exec -it kafka /bin/bash
  3. [root@localhost kafka]#
  4. [root@localhost kafka]# docker exec -it kafka /bin/bash
  5. root@b03ba55d79cb:/#
  6. root@b03ba55d79cb:/#
2.6.2 创建topic
  1. # 在Kafka容器中,运行以下命令创建一个测试主题
  2. # 进入kafka 容器后,创建topic
  3. # kafka-topics.sh --create --topic <topic_name> --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
  4. [root@localhost kafka]# docker exec -it kafka /bin/bash # 进入kafka 容器
  5. root@b03ba55d79cb:/#
  6. root@b03ba55d79cb:/# kafka-topics.sh --create --topic test1221 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181 # 创建 topic
  7. Created topic test1221.
  8. root@b03ba55d79cb:/#
  9. 注意: 如果topic 包含 . 或者 _ 时,在执行创建topic过程中,会出现一个警告信息,提示主题名称中句点和下划线的使用限制。最后,命令成功执行并显示创建主题的结果
  10. root@b03ba55d79cb:/# kafka-topics.sh --create --topic "alarm_warning" --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
  11. WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
  12. Created topic alarm_warning.
  13. root@b03ba55d79cb:/#
  14. 注意:
  15. 在执行命令之后,出现了一个警告信息,提示由于指标名称的限制,主题名称中的句点('.')或下划线('_')可能会发生冲突。为了避免问题,最好只使用其中一种符号,而不是同时使用
2.6.3 查询已创建的topic
  1. # 查询已创建的所有topic
  2. [root@localhost grafana_loki_vector]# docker exec -it kafka /bin/bash # 进入容器
  3. root@b03ba55d79cb:/#
  4. root@b03ba55d79cb:/# kafka-topics.sh --list --zookeeper zookeeper:2181 # 查询topic
  5. __consumer_offsets
  6. alarm_warning
  7. mag_test
  8. test
  9. test1221
  10. test2013
  11. test2023
  12. test20231221
  13. root@b03ba55d79cb:/#

2.6.4 在创建的主题中生产消息 
  1. # 在创建的主题中,生产kafka 消息
  2. # kafka-console-producer.sh --broker-list localhost:9092 --topic <主题名>
  3. [root@localhost kafka]# docker exec -it kafka /bin/bash
  4. root@b03ba55d79cb:/#
  5. root@b03ba55d79cb:/# kafka-topics.sh --create --topic test1221 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
  6. Created topic test1221.
  7. root@b03ba55d79cb:/#
  8. root@b03ba55d79cb:/#
  9. root@b03ba55d79cb:/# kafka-console-producer.sh --broker-list localhost:9092 --topic test1221
  10. >hell ka^H^H
  11. >hello kafka-122^H^H
  12. >
  13. >hello kafka-20131221
  14. >
  15. >topci test2023? y/n
  16. >topci test1221!
  17. >e\^H
  18. >e
  19. >
  20. >^Croot@b03ba55d79cb:/# #通过 Ctrl + C 退出kafka 生产者
  21. root@b03ba55d79cb:/#
2.6.5 kafka 消费者消费消息
  1. 在另一个终端窗口中后,操作如下:
  2. 1)需要先进入kakfa 容器
  3. # docker exec -it kafka /bin/bash
  4. 2)打开一个消费者来读取测试主题的消息
  5. #kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <主题名> --from-beginning
  6. --from-beginning 为可选参数: 每次消费 该主题所有消息
  7. 不带 --from-beginning 参数: 每次仅仅消费启动kafka 消费者后,该主题最新的消息
  8. ex: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <主题名>
  9. 注意: kafka 消费者消费消息之前要先进入 kafka 容器
  10. ======================================================================================
  11. [root@localhost ~]#
  12. [root@localhost ~]# docker exec -it kafka /bin/bash #进入kafka 容器
  13. root@b03ba55d79cb:/#
  14. root@b03ba55d79cb:/# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1221 --from-beginning
  15. hell ka
  16. hello kafka-122
  17. hello kafka-20131221
  18. topci test2023? y/n
  19. topci test1221!
  20. e\
  21. e
  22. ^CProcessed a total of 11 messages #通过 Ctrl + C 退出kafka 消费者
  23. root@b03ba55d79cb:/#

到此,基本完成使用Docker运行ZooKeeper和Kafka,并进行基本验证的过程。 

3  安装完kafka 容器后,修改docker 中kafka 容器配置文件

3.1 进入kafka 容器
  1. # 进入kafka 容器
  2. # docker exec -it <container_name_or_id> /bin/bash
  3. [root@localhost ~]# docker exec -it kafka /bin/bash
  4. root@b03ba55d79cb:/#
3.2 修改配置文件
3.2.1 安装 vim
  1. # 在一些Docker镜像中,可能没有预安装vi编辑器。你可以使用其他可用的编辑器来修改Kafka配置文件
  2. # apt-get update
  3. # apt-get install vim
  4. root@b03ba55d79cb:/#
  5. root@b03ba55d79cb:/# vi /opt/kafka/config/server.properties
  6. bash: vi: command not found
  7. root@b03ba55d79cb:/# vim /opt/kafka/config/server.properties
  8. bash: vim: command not found
  9. root@b03ba55d79cb:/#
  10. # vim: 一种功能丰富的终端文本编辑器。可以使用以下命令安装并使用vim
  11. root@b03ba55d79cb:/# apt-get update
  12. Get:1 http://deb.debian.org/debian bullseye InRelease [116 kB]
  13. Get:2 http://security.debian.org/debian-security bullseye-security InRelease [48.4 kB]
  14. Get:3 https://download.docker.com/linux/debian bullseye InRelease [43.3 kB]
  15. Get:4 http://deb.debian.org/debian bullseye-updates InRelease [44.1 kB]
  16. Get:5 http://security.debian.org/debian-security bullseye-security/main amd64 Packages [261 kB]
  17. Get:6 https://download.docker.com/linux/debian bullseye/stable amd64 Packages [28.1 kB]
  18. Get:7 http://deb.debian.org/debian bullseye/main amd64 Packages [8062 kB]
  19. Get:8 http://deb.debian.org/debian bullseye-updates/main amd64 Packages [17.7 kB]
  20. Fetched 8621 kB in 1min 57s (74.0 kB/s)
  21. Reading package lists... Done
  22. root@b03ba55d79cb:/#
  23. root@b03ba55d79cb:/#
  24. root@b03ba55d79cb:/# apt-get install vim
  25. Reading package lists... Done
  26. Building dependency tree... Done
  27. Reading state information... Done
  28. The following additional packages will be installed:
  29. libgpm2 vim-common vim-runtime xxd
  30. Suggested packages:
  31. gpm ctags vim-doc vim-scripts
  32. The following NEW packages will be installed:
  33. libgpm2 vim vim-common vim-runtime xxd
  34. 0 upgraded, 5 newly installed, 0 to remove and 32 not upgraded.
  35. Need to get 8174 kB of archives.
  36. After this operation, 36.9 MB of additional disk space will be used.
  37. Do you want to continue? [Y/n] y
  38. Get:1 http://deb.debian.org/debian bullseye/main amd64 xxd amd64 2:8.2.2434-3+deb11u1 [192 kB]
  39. Get:2 http://deb.debian.org/debian bullseye/main amd64 vim-common all 2:8.2.2434-3+deb11u1 [226 kB]
  40. Get:3 http://deb.debian.org/debian bullseye/main amd64 libgpm2 amd64 1.20.7-8 [35.6 kB]
  41. Get:4 http://deb.debian.org/debian bullseye/main amd64 vim-runtime all 2:8.2.2434-3+deb11u1 [6226 kB]
  42. ......
  43. ......
  44. ......
3.2.2 修改配置文件
  1. # 修改配置文件以下字段内容:
  2. listeners=PLAINTEXT://0.0.0.0:9092
  3. advertised.listeners=PLAINTEXT://your_ip_address:9092
  4. root@b03ba55d79cb:/#
  5. root@b03ba55d79cb:/# vim /opt/kafka/config/server.properties
  6. root@b03ba55d79cb:/#
  7. root@b03ba55d79cb:/# cat /opt/kafka/config/server.properties
  8. # Licensed to the Apache Software Foundation (ASF) under one or more
  9. # contributor license agreements. See the NOTICE file distributed with
  10. # this work for additional information regarding copyright ownership.
  11. # The ASF licenses this file to You under the Apache License, Version 2.0
  12. # (the "License"); you may not use this file except in compliance with
  13. # the License. You may obtain a copy of the License at
  14. #
  15. # http://www.apache.org/licenses/LICENSE-2.0
  16. #
  17. # Unless required by applicable law or agreed to in writing, software
  18. # distributed under the License is distributed on an "AS IS" BASIS,
  19. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. # See the License for the specific language governing permissions and
  21. # limitations under the License.
  22. # see kafka.server.KafkaConfig for additional details and defaults
  23. ############################# Server Basics #############################
  24. # The id of the broker. This must be set to a unique integer for each broker.
  25. broker.id=-1
  26. ############################# Socket Server Settings #############################
  27. # The address the socket server listens on. It will get the value returned from
  28. # java.net.InetAddress.getCanonicalHostName() if not configured.
  29. # FORMAT:
  30. # listeners = listener_name://host_name:port
  31. # EXAMPLE:
  32. # listeners = PLAINTEXT://your.host.name:9092
  33. # listeners=PLAINTEXT://0.0.0.0:9092
  34. listeners=PLAINTEXT://192.168.2.247:9092
  35. # Hostname and port the broker will advertise to producers and consumers. If not set,
  36. # it uses the value for "listeners" if configured. Otherwise, it will use the value
  37. # returned from java.net.InetAddress.getCanonicalHostName().
  38. #advertised.listeners=PLAINTEXT://localhost:9092
  39. advertised.listeners=PLAINTEXT://192.168.2.247:9092 #//your_ip_address:9092
  40. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  41. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  42. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  43. num.network.threads=3
  44. # The number of threads that the server uses for processing requests, which may include disk I/O
  45. num.io.threads=8
  46. # The send buffer (SO_SNDBUF) used by the socket server
  47. socket.send.buffer.bytes=102400
  48. # The receive buffer (SO_RCVBUF) used by the socket server
  49. socket.receive.buffer.bytes=102400
  50. # The maximum size of a request that the socket server will accept (protection against OOM)
  51. socket.request.max.bytes=104857600
  52. ############################# Log Basics #############################
  53. # A comma separated list of directories under which to store log files
  54. log.dirs=/kafka/kafka-logs-b03ba55d79cb
  55. # The default number of log partitions per topic. More partitions allow greater
  56. # parallelism for consumption, but this will also result in more files across
  57. # the brokers.
  58. num.partitions=1
  59. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  60. # This value is recommended to be increased for installations with data dirs located in RAID array.
  61. num.recovery.threads.per.data.dir=1
  62. ############################# Internal Topic Settings #############################
  63. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  64. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
  65. offsets.topic.replication.factor=1
  66. transaction.state.log.replication.factor=1
  67. transaction.state.log.min.isr=1
  68. ############################# Log Flush Policy #############################
  69. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  70. # the OS cache lazily. The following configurations control the flush of data to disk.
  71. # There are a few important trade-offs here:
  72. # 1. Durability: Unflushed data may be lost if you are not using replication.
  73. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  74. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  75. # The settings below allow one to configure the flush policy to flush data after a period of time or
  76. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  77. # The number of messages to accept before forcing a flush of data to disk
  78. #log.flush.interval.messages=10000
  79. # The maximum amount of time a message can sit in a log before we force a flush
  80. #log.flush.interval.ms=1000
  81. ############################# Log Retention Policy #############################
  82. # The following configurations control the disposal of log segments. The policy can
  83. # be set to delete segments after a period of time, or after a given size has accumulated.
  84. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  85. # from the end of the log.
  86. # The minimum age of a log file to be eligible for deletion due to age
  87. log.retention.hours=168
  88. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  89. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  90. #log.retention.bytes=1073741824
  91. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  92. log.segment.bytes=1073741824
  93. # The interval at which log segments are checked to see if they can be deleted according
  94. # to the retention policies
  95. log.retention.check.interval.ms=300000
  96. ############################# Zookeeper #############################
  97. # Zookeeper connection string (see zookeeper docs for details).
  98. # This is a comma separated host:port pairs, each corresponding to a zk
  99. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  100. # You can also append an optional chroot string to the urls to specify the
  101. # root directory for all kafka znodes.
  102. zookeeper.connect=zookeeper:2181
  103. # Timeout in ms for connecting to zookeeper
  104. zookeeper.connection.timeout.ms=18000
  105. ############################# Group Coordinator Settings #############################
  106. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  107. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  108. # The default value for this is 3 seconds.
  109. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  110. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  111. group.initial.rebalance.delay.ms=0
  112. port=9092
  113. root@b03ba55d79cb:/#

注意:修改kafka 配置文件后,重启kafka 服务即可生效。

若重启kafka 容器(比如:docker start/restart <kafka container_ID or container_Name>)会自动调用生成kafka 容器时命令(docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka )

故,若第一次创建 容器时配置写错,即便进入容器修改配置文件,只要执行 docker start / restart <container_ID or container_Name> 的命令,容器配置文件会再次恢复,修改配置不生效。

以上有效解决放法,删除容器,重新创建,具体操作 3.3  修改配置,重启重建容器

重启容器中kafka 服务,配置更改生效,为避免已后麻烦,最好直接删除容器,修改配置参数,重建容器

 以上有效解决放法,删除容器,重新创建,具体操作 3.3  修改配置,重启重建容器

3.3 重启重建容器  
3.3.1 停止运行的kafka 容器
  1. # 修改配置文件后,重启kafka 容器后,再次查看配置文件,修改没有保存,为此,删除kafka 缓存
  2. 1. 停止运行的kafka 容器
  3. 2. 删除 kafka 容器
  4. 3. 重建新kafka 容器
  5. [root@localhost ~]# docker ps # 查询运行的kafka 容器
  6. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  7. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 31 hours ago Up 5 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  8. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  9. [root@localhost ~]#
  10. [root@localhost ~]#
  11. [root@localhost ~]# docker stop kafka # 停止 kafka 容器
  12. kafka
  13. [root@localhost ~]#
  14. [root@localhost ~]#
  15. 备注:
  16. 重启容器命令: docker restart <container_id or container_name >
  17. # docker restart <container_id>
  18. [root@localhost ~]#
  19. [root@localhost ~]# docker ps # 重启前
  20. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  21. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 28 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  22. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  23. [root@localhost ~]#
  24. [root@localhost ~]#
  25. [root@localhost ~]# docker restart b03ba55d79cb # 重启kafka 容器
  26. b03ba55d79cb
  27. [root@localhost ~]#
  28. [root@localhost ~]#
  29. [root@localhost ~]# docker ps # 重启后
  30. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  31. b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 30 hours ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  32. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  33. [root@localhost ~]#
3.3.2  删除kafka 容器
  1. [root@localhost ~]#
  2. [root@localhost ~]# docker rm kafka # 删除 kafka 容器
  3. kafka
  4. [root@localhost ~]#
  5. [root@localhost ~]# docker ps -a # 查询所有容器
  6. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  7. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  8. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  9. [root@localhost ~]#
3.3.3  删除Kafka数据目录
  1. 删除Kafka数据目录:Kafka存储数据的目录通常位于容器内部的/var/lib/kafka路径。删除该目录以清除Kafka的数据缓存。注意,删除数据目录将导致所有Kafka数据的丢失,包括主题、消费者偏移量等
  2. [root@localhost ~]#
  3. [root@localhost ~]# rm -rf /var/lib/kafka # 删除Kafka数据目录
  4. [root@localhost ~]#
  5. [root@localhost ~]#
3.3.4 重建kafk 容器 
  1. # 重建kafk 容器
  2. [root@localhost ~]#
  3. [root@localhost ~]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
  4. 563f64beaba4621a714fc44d6a4f81f9464fab682330eddb51a737bdeb001934
  5. [root@localhost ~]#
  6. [root@localhost ~]#
  7. [root@localhost ~]# docker ps -a
  8. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  9. 563f64beaba4 wurstmeister/kafka "start-kafka.sh" 8 seconds ago Up 7 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  10. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  11. b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
  12. [root@localhost ~]#

4. 安装完kafka 容器后,设置容器与主机时间保持一致 

  1. [root@localhost ~]# date # 设置之前,主机时间
  2. Fri Dec 22 17:06:37 CST 2023
  3. [root@localhost ~]#
  4. [root@localhost ~]#
  5. [root@localhost ~]# docker exec -it kafka /bin/bash # 进入kafka 容器
  6. root@b03ba55d79cb:/#
  7. root@b03ba55d79cb:/# date # kafka container 时间
  8. Fri Dec 22 09:06:50 UTC 2023
  9. root@b03ba55d79cb:/#
  10. # kafka容器内部,可以使用以下命令来设置与主机系统时间同步
  11. root@b03ba55d79cb:/#
  12. root@b03ba55d79cb:/# cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
  13. root@b03ba55d79cb:/#
  14. root@b03ba55d79cb:/# date # 设置之后,kafka container 时间
  15. Fri Dec 22 17:08:13 CST 2023
  16. root@b03ba55d79cb:/#
  17. root@b03ba55d79cb:/# exit
  18. exit
  19. [root@localhost ~]#
  20. [root@localhost ~]# date # 设置之后,主机时间
  21. Fri Dec 22 17:08:18 CST 2023
  22. [root@localhost ~]#
  23. 注意:
  24. 为了避免后期设置时间同步问题,在创建 容器时添加 参数 保证容器与主机时间一致
  25. -v /etc/localtime:/etc/localtime:ro
  26. ro选项将/etc/localtime文件挂载为只读模式,以防止容器内部意外修改主机系统时间

至此,已结束kafka 部署

5. 远程连接kafka 容器失败原因定位

因远程连接kafka 容器失败,曾多次频繁删除kafka 容器,导致kafka 容器内 生产者生产的消息,消费者接收不到。

5.1  删除zookeeper 容器
  1. [root@localhost kafka]# docker stop zookeeper
  2. zookeeper
  3. [root@localhost kafka]#
  4. [root@localhost kafka]#
  5. [root@localhost kafka]#
  6. [root@localhost kafka]# docker ps -a
  7. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  8. 8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 weeks ago Exited (137) 6 seconds ago zookeeper
  9. b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
  10. [root@localhost kafka]#
  11. [root@localhost kafka]#
  12. [root@localhost kafka]#
  13. [root@localhost kafka]# docker rm zookeeper
  14. zookeeper
  15. [root@localhost kafka]# docker ps -a
  16. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  17. b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
  18. [root@localhost kafka]#
  19. [root@localhost kafka]#
5.2  删除kafka 容器
5.3  重新创建 zookeeper 容器
  1. # 创建 zookeeper 容器
  2. [root@localhost kafka]# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
  3. a893cce0d4652933da78da1da3a64ade5e530d42fc4806eb5c8448a13305867f
  4. [root@localhost kafka]#
  5. [root@localhost kafka]# docker ps
  6. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  7. a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 4 seconds ago Up 3 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  8. [root@localhost kafka]# docker ps -a
  9. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  10. a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 13 seconds ago Up 12 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  11. b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
  12. [root@localhost kafka]#
  13. [root@localhost kafka]#
  14. [root@localhost kafka]#
  15. [root@localhost kafka]# docker ps # 查询启动容器
  16. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  17. a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 18 seconds ago Up 17 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  18. [root@localhost kafka]#
5.4 创建 kafka 容器
  1. #创建 kafka 容器
  2. [root@localhost kafka]#
  3. [root@localhost kafka]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
  4. c4f7b69648a8e6fe6e3916d14c75c2a9d402d07bbff8e8eb53910834000647e6
  5. [root@localhost kafka]#
  6. [root@localhost kafka]# docker ps
  7. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  8. c4f7b69648a8 wurstmeister/kafka "start-kafka.sh" 4 seconds ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
  9. a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About a minute ago Up About a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
  10. [root@localhost kafka]#
  11. [root@localhost kafka]# docker exec -it kafka /bin/bash
  12. root@c4f7b69648a8:/#
  13. root@c4f7b69648a8:/#
  14. root@c4f7b69648a8:/#
  15. root@c4f7b69648a8:/#
  16. root@c4f7b69648a8:/#
  17. root@c4f7b69648a8:/# kafka-topics.sh --list --zookeeper zookeeper:2181
  18. __consumer_offsets
  19. alarm_warning
  20. root@c4f7b69648a8:/#
  21. root@c4f7b69648a8:/#
5.5 创建 topic
  1. # 创建topic
  2. root@c4f7b69648a8:/#
  3. root@c4f7b69648a8:/# kafka-topics.sh --create --topic kafka_consumer --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
  4. WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
  5. Created topic kafka_consumer.
  6. root@c4f7b69648a8:/#
  7. root@c4f7b69648a8:/# kafka-topics.sh --create --topic test1 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
  8. Created topic test1.
  9. root@c4f7b69648a8:/#
  10. root@c4f7b69648a8:/#
  11. root@c4f7b69648a8:/# kafka-topics.sh --list --zookeeper zookeeper:2181
  12. __consumer_offsets
  13. alarm_warning
  14. kafka_consumer
  15. test1
  16. root@c4f7b69648a8:/#
  17. root@c4f7b69648a8:/#
5.6 远程连接kafka 容器
  1. # 远程连接配置kafka 容器所在 主机IP:9092
  2. # kafka 消费者消费消息
  3. # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic_name>
  4. [root@localhost kafka]# docker exec -it kafka /bin/bash # 进入容器
  5. root@c4f7b69648a8:/#
  6. root@c4f7b69648a8:/# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic alarm_warning
  7. {"currentIndex":0,"header":{"headerMap":{"35":"381166"}},"mapList":[{"10007":"1"},{"4322 0":"金额:[3000000,5205900],数量:[300000,80000],价格涨跌幅:[0.02,0.0398387],市场成交量占 比:[0.3,0.444444],区间内最低买入成交价: 63.53,昨收盘价: 62,下单数量: 50000,当前交易价格: 66,挂单数量: 0,挂单金额: 0,成交数量: 30000,成交金额: 1905900,市场成交数量: 130000","660 05":"2","28005":"2","66004":"0","44":"660000","38":"50000","28000":"20231225","28001":"1 703496682550","11001":"四川路桥","64103":"2408-4096个账户-4096个证券","93020":"1","64101 ":"2410","28101":"1565740725","91001":"830035","64102":"1101","11436":"20231225","37":"2 ","48":"600038","65098":"1","1301":"1","66003":"1","66002":"5"}]}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/575496
推荐阅读
相关标签
  

闽ICP备14008679号