当前位置:   article > 正文

通过 docker-compose 部署 Kafka_docker-compose部署kafka

docker-compose部署kafka

部署 docker:linux下安装docker

部署 docker-compose:linux下安装docker compose

创建docker镜像网络环境:

  1. # 创建,注意不能使用hadoop_network,要不然启动hs2服务的时候会有问题!!!
  2. docker network create hadoop-network
  3. # 查看
  4. docker network ls

Kafka 编排部署

下载 Kafka

(#需要java环境支持)

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz --no-check-certificate
配置
  • config/kafka-node1/server.properties

    1. # 常见配置挂载目录
    2. mkdir config/{kafka-node1,kafka-node2,kafka-node3} -p
    3. # 配置
    4. cat >config/kafka-node1/server.properties<<EOF
    5. #broker的全局唯一编号,不能重复
    6. broker.id=1
    7. #删除topic功能使能
    8. delete.topic.enable=true
    9. #处理网络请求的线程数量
    10. num.network.threads=3
    11. #用来处理磁盘IO的现成数量
    12. num.io.threads=8
    13. #发送套接字的缓冲区大小
    14. socket.send.buffer.bytes=102400
    15. #接收套接字的缓冲区大小
    16. socket.receive.buffer.bytes=102400
    17. #请求套接字的缓冲区大小
    18. socket.request.max.bytes=104857600
    19. #kafka数据的存储位置
    20. log.dirs=/opt/apache/kafka/logs
    21. #指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
    22. num.partitions=3
    23. #副本,默认只有一个副本,不会进行数据备份和冗余
    24. replication.factor=3
    25. #用来恢复和清理data下数据的线程数量
    26. num.recovery.threads.per.data.dir=1
    27. #segment文件保留的最长时间,超时将被删除
    28. log.retention.hours=168
    29. #配置连接Zookeeper集群地址
    30. zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
    31. #zookeeper连接超时时间
    32. zookeeper.connection.timeout.ms=60000
    33. EOF
  • config/kafka-node2/server.properties

    1. cat >config/kafka-node2/server.properties<<EOF
    2. #broker的全局唯一编号,不能重复
    3. broker.id=2
    4. #删除topic功能使能
    5. delete.topic.enable=true
    6. #处理网络请求的线程数量
    7. num.network.threads=3
    8. #用来处理磁盘IO的现成数量
    9. num.io.threads=8
    10. #发送套接字的缓冲区大小
    11. socket.send.buffer.bytes=102400
    12. #接收套接字的缓冲区大小
    13. socket.receive.buffer.bytes=102400
    14. #请求套接字的缓冲区大小
    15. socket.request.max.bytes=104857600
    16. #kafka数据的存储位置
    17. log.dirs=/opt/apache/kafka/logs
    18. #指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
    19. num.partitions=3
    20. #副本,默认只有一个副本,不会进行数据备份和冗余
    21. replication.factor=3
    22. #用来恢复和清理data下数据的线程数量
    23. num.recovery.threads.per.data.dir=1
    24. #segment文件保留的最长时间,超时将被删除
    25. log.retention.hours=168
    26. #配置连接Zookeeper集群地址
    27. zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
    28. #zookeeper连接超时时间
    29. zookeeper.connection.timeout.ms=60000
    30. EOF
  • config/kafka-node3/server.properties

    1. cat >config/kafka-node3/server.properties<<EOF
    2. #broker的全局唯一编号,不能重复
    3. broker.id=3
    4. #删除topic功能使能
    5. delete.topic.enable=true
    6. #处理网络请求的线程数量
    7. num.network.threads=3
    8. #用来处理磁盘IO的现成数量
    9. num.io.threads=8
    10. #发送套接字的缓冲区大小
    11. socket.send.buffer.bytes=102400
    12. #接收套接字的缓冲区大小
    13. socket.receive.buffer.bytes=102400
    14. #请求套接字的缓冲区大小
    15. socket.request.max.bytes=104857600
    16. #kafka数据的存储位置
    17. log.dirs=/opt/apache/kafka/logs
    18. #指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
    19. num.partitions=3
    20. #副本,默认只有一个副本,不会进行数据备份和冗余
    21. replication.factor=3
    22. #用来恢复和清理data下数据的线程数量
    23. num.recovery.threads.per.data.dir=1
    24. #segment文件保留的最长时间,超时将被删除
    25. log.retention.hours=168
    26. #配置连接Zookeeper集群地址
    27. zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
    28. #zookeeper连接超时时间
    29. zookeeper.connection.timeout.ms=60000
    30. EOF
启动脚本 bootstrap.sh
  1. #!/usr/bin/env sh
  2. ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
构建镜像 Dockerfile
  1. FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908
  2. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  3. RUN export LANG=zh_CN.UTF-8
  4. # 创建用户和用户组,跟yaml编排里的user: 10000:10000
  5. RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m
  6. # 安装sudo
  7. RUN yum -y install sudo ; chmod 640 /etc/sudoers
  8. # 给hadoop添加sudo权限
  9. RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
  10. RUN yum -y install install net-tools telnet wget nc less
  11. RUN mkdir /opt/apache/
  12. # 添加配置 JDK
  13. ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
  14. ENV JAVA_HOME /opt/apache/jdk1.8.0_212
  15. ENV PATH $JAVA_HOME/bin:$PATH
  16. # 添加配置 kafka server
  17. ENV KAFKA_VERSION 2.12-3.4.0
  18. ADD kafka_${KAFKA_VERSION}.tgz /opt/apache/
  19. ENV KAFKA_HOME /opt/apache/kafka
  20. RUN ln -s /opt/apache/kafka_${KAFKA_VERSION}-bin $KAFKA_HOME
  21. # 创建数据存储目录
  22. RUN mkdir -p ${KAFKA_HOME}/data/logs
  23. # copy bootstrap.sh
  24. COPY bootstrap.sh /opt/apache/
  25. RUN chmod +x /opt/apache/bootstrap.sh
  26. RUN chown -R hadoop:hadoop /opt/apache
  27. WORKDIR $KAFKA_HOME
开始构建镜像
  1. # 需要查看构建镜像详细过程则需要加上 --progress=plain 选项
  2. docker build -t registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0 . --no-cache --progress=plain
编排 docker-compose.yaml
  1. version: '3'
  2. services:
  3. kafka-node1:
  4. image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
  5. user: "hadoop:hadoop"
  6. container_name: kafka-node1
  7. hostname: kafka-node1
  8. restart: always
  9. privileged: true
  10. env_file:
  11. - .env
  12. volumes:
  13. - ./config/kafka-node1/server.properties:${KAFKA_HOME}/config/server.properties
  14. ports:
  15. - "${KAFKA_NODE1_SERVER_PORT}:9092"
  16. expose:
  17. - 2888
  18. - 3888
  19. command: ["sh","-c","/opt/apache/bootstrap.sh"]
  20. networks:
  21. - hadoop-network
  22. healthcheck:
  23. test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
  24. interval: 10s
  25. timeout: 10s
  26. retries: 5
  27. kafka-node2:
  28. image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
  29. user: "hadoop:hadoop"
  30. container_name: kafka-node2
  31. hostname: kafka-node2
  32. restart: always
  33. privileged: true
  34. env_file:
  35. - .env
  36. volumes:
  37. - ./config/kafka-node2/server.properties:${KAFKA_HOME}/config/server.properties
  38. ports:
  39. - "${KAFKA_NODE2_SERVER_PORT}:9092"
  40. expose:
  41. - 2888
  42. - 3888
  43. command: ["sh","-c","/opt/apache/bootstrap.sh"]
  44. networks:
  45. - hadoop-network
  46. healthcheck:
  47. test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
  48. interval: 10s
  49. timeout: 10s
  50. retries: 5
  51. kafka-node3:
  52. image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
  53. user: "hadoop:hadoop"
  54. container_name: kafka-node3
  55. hostname: kafka-node3
  56. restart: always
  57. privileged: true
  58. env_file:
  59. - .env
  60. volumes:
  61. - ./config/kafka-node3/server.properties:${KAFKA_HOME}/config/server.properties
  62. ports:
  63. - "${KAFKA_NODE3_SERVER_PORT}:9092"
  64. expose:
  65. - 2888
  66. - 3888
  67. command: ["sh","-c","/opt/apache/bootstrap.sh"]
  68. networks:
  69. - hadoop-network
  70. healthcheck:
  71. test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
  72. interval: 10s
  73. timeout: 10s
  74. retries: 5
  75. # 连接外部网络
  76. networks:
  77. hadoop-network:
  78. external: true
.ENV 环境变量文件内容如下: 
  1. # 对外暴露的端口
  2. cat << EOF > .env
  3. KAFKA_HOME=/opt/apache/kafka
  4. KAFKA_NODE1_SERVER_PORT=39092
  5. KAFKA_NODE2_SERVER_PORT=39093
  6. KAFKA_NODE3_SERVER_PORT=39094
  7. EOF
开始部署
  1. docker-compose -f docker-compose.yaml up -d
  2. # 查看
  3. docker-compose -f docker-compose.yaml ps

简单测试验证

  1. # 登录zookeeper,在zookeeper查看brokers
  2. ${ZOOKEEPER_HOME}/bin/zkCli.sh ls /brokers/ids
  3. ${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/1
  4. ${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/2
  5. ${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/3

常用的 Kafka 客户端命令

添加topic

  1. # 随便登录
  2. docker exec -it kafka-node1 bash
  3. # 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000
  4. ${KAFKA_HOME}/bin/kafka-topics.sh --create --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000

查看topic

  1. # 查看topic列表
  2. ${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
  3. # 查看topic列表详情
  4. ${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe
  5. # 指定topic
  6. ${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002
  7. # 查看消费者组
  8. ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
  9. kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002

修改topic

  1. # 修改分区,扩分区,不能减少分区
  2. ${KAFKA_HOME}/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partitions 2
  3. # 修改过期时间,下面两行都可以
  4. ${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --topic test002 --add-config retention.ms=86400000
  5. ${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000
  6. # 修改副本数,将副本数修改成3
  7. $ cat >1.json<<EOF
  8. {"version":1,
  9. "partitions":[
  10. {"topic":"test002","partition":0,"replicas":[0,1,2]},
  11. {"topic":"test002","partition":1,"replicas":[1,2,0]},
  12. {"topic":"test002","partition":2,"replicas":[2,0,1]}
  13. ]}
  14. EOF
  15. ${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

扩容分区

  1. #把test002 topic扩容为6个分区。
  2. #注意:目前不支持减少分区,扩容前必须存在这个主题。
  3. ${KAFKA_HOME}/bin/kafka-topics.sh -alter --partitions 6 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
  4. ${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

删除topic

${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092

生成者和消费者

生产者
  1. ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
  2. {"id":"1","name":"n1","age":"20"}
  3. {"id":"2","name":"n2","age":"21"}
  4. {"id":"3","name":"n3","age":"22"}
消费者
  1. # 从头开始消费
  2. ${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --from-beginning
  3. # 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
  4. ${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partition 0 --offset 100

消费组

在 Kafka 中,消费组(Consumer Group)是一组独立消费者的集合,它们共同消费一个或多个 Topic 中的数据。消费组内的消费者协同工作,通过分摊该 Topic 中的所有分区,以实现消息的消费和处理。

消费组在 Kafka 消息队列中起到了至关重要的作用。它可以提供如下功能:

  • 并发消费:消费组内的每个消费者都可以独立地消费消息,可以实现高并发处理。

  • 自动负载均衡:消费组内的消费者会自动协作,将消费任务均分到所有消费者上,使得每个消费者都能处理相同数量的消息。

  • 提高可用性:当消费组内的一个或多个消费者故障退出时,消息会自动分配到其他消费者上,保证消费任务的不间断执行。

  • 支持多租户:可以通过 Consumer Group 来对不同的租户进行消息隔离,不同的 Consumer Group 可以读取同一个 Topic 的不同副本,或者读取不同 Topic 的不同分区,实现多个实例共享同一 Topic 或分散处理不同 Topic。

示例如下:

${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --group test002

查看数据积压

${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002

kafka 数据积压处理方法

在 Kafka 中,由于消息的生产和消费速度可能不一致,导致消息会积压在 Kafka 的分区中,如果这些积压的消息处理不及时,会导致 Kafka 系统的性能下降和可用性降低等问题。因此,需要采取一些处理方法来解决数据积压问题:

  • 增加消费者:增加消费者可以使消费任务并行执行,加快消息的处理速度。可以通过增加消费者的方式将积压的消息消费掉,提高系统处理速度和效率。

  • 调整消费者组:当一个消费组中的消费者无法处理所有的消息时,可以考虑调整消费者组。可以增加消费者的数量或者更换消费者组,以适应消息处理的速度和大小。

  • 调整消息分区:Kafka 中Topic 的分区数也会影响数据积压的情况。可以调整分区数以改善数据读取和分发的情况,或者对热点 Topic 进行分区处理,以实现更好的性能和可用性。

  • 调整消费 offset:若积压的消息都已经被处理过了,却还在 Kafka 中存在,可能是消费者消费 offset 设置错误导致的。可以通过 Kafka 的 offset 操作,重置消费 offset,跳过已经处理过的消息,减少数据积压的问题。

  • 执行消息清洗:在消费 Kafka 消息时,可以额外执行一些消息清洗处理操作,将无用的数据过滤出去,或者将数据进行清理和格式化处理,减少中间处理环节,提高数据消费的效率和可用性。

以上是一些解决 Kafka 数据积压问题的常用方法,需要视具体情况而定,选择合适的方法来解决。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/651987
推荐阅读
相关标签
  

闽ICP备14008679号