赞
踩
不要访问中文官网,信息落后很多。英文官网:https://kafka.apache.org/
目前最新版本3.0.0,不支持Java 8 和 Scala 2.12。
Kafka2.8 中弃用Zookeeper,使用Raft协议进行选举。
Kafka2.2.2 以下版本有Jackson安全问题。
Kafka2.2 开始将 kafka-topic.sh 中 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
确保集群所有broker启动,再进行消费,保证__consumer_offsets
主题的副本均匀分布到broker。
使用宿主机IP。
# broker节点ID
-e KAFKA_BROKER_ID=0
# 明文:实际服务端口
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094
# 明文:对外端口(宿主机IP)
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.25.18.45:9094
# zookeeper(宿主机IP)
-e KAFKA_ZOOKEEPER_CONNECT=172.25.18.45:2181
# 重要:默认副本数(内置topic:__consumer_offsets=1不能高可用,已经创建为1的topic,需要动态修改)
-e KAFKA_DEFAULT_REPLICATION_FACTOR=3
# 副本数
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3
# 事务主题的副本数
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
# ISR = In Sync Replicas(当前活跃副本)
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=3
先启动Zookeeper,再启动Kafka,关闭则相反。
Docker部署Kafka时,需要 advertised_listeners 进行广播,内网可以不需要。
# 启动Zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
# 启动Kafka (斜杠后面不要有空格)
docker run -d --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.25.18.45:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=172.25.18.45:2181 \
-e KAFKA_DEFAULT_REPLICATION_FACTOR=1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-t wurstmeister/kafka
# 检查服务
telnet 172.25.18.45 2181
telnet 172.25.18.45 9092
最新版本已经改名叫CMAK,最新版本需要Java11,选2.x版本。
kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源。
雅虎开源kafka-manager:https://github.com/yahoo/kafka-manager
滴滴开源kafka-manager : https://github.com/didi/kafka-manager
使用文档:https://github.com/didi/kafka-manager/blob/master/doc/user_cn_guide.md
# 下载
docker pull sheepkiller/kafka-manager
# 启动
docker run -itd --name kafka-manager \
-p 9000:9000 \
--restart=always \
-e ZK_HOSTS=172.25.18.45:2181 \
sheepkiller/kafka-manager
访问:http://localhost:9000
没必要不用开启,影响性能,可以监测每秒消息数。
在启动 kafka 服务 之前设置 JMX_PORT 环境变量。
# 进入Kafka容器
docker exec -it kafka01 /bin/bash
# 进入该镜像默认脚本目录
cd /opt/kafka/bin
# 编辑脚本
vi kafka-server-start.sh
#######################
export JMX_PORT=9999
#######################
# 重启Kafka
docker restart kafka01
Please enable JMX polling here.
大量消费者不建议使用。
勾选 : Poll consumer information (Not recommended for large # of consumers)
# 进入容器
docker exec -it kafka01 /bin/bash
# 该镜像Kafka目录
cd /opt/kafka
# 默认数据存储目录
# cd /kafka
# 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server 172.25.18.45:9092
# 创建topic (3副本、6分区)
./bin/kafka-topics.sh --create --bootstrap-server 172.25.18.45:9092 --replication-factor 3 --partitions 6 --topic test-topic
# 删除topic
./bin/kafka-topics.sh --delete --bootstrap-server 172.25.18.45:9092 --topic test-topic
# 查看topic消息总数 --time-1:最大位移,--time-2:最早位移。
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.25.18.45:9092 --topic wangfugui-topic --time -1
# 查看topic的描述信息
./bin/kafka-topics.sh --describe --bootstrap-server 172.25.18.45:9092 --topic wangfugui-topic
# 第一行是对 topic 的概述(6个分区,3个副本),下面的是对每个分区的描述
Topic: adam-topic1 PartitionCount: 6 ReplicationFactor: 3 Configs:
Topic: adam-topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: adam-topic1 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: adam-topic1 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: adam-topic1 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: adam-topic1 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: adam-topic1 Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# Partition: 0 --> 分区0
# Leader: 2 --> 读写Leader在broker2上
# Replicas: 2,0,1 --> 三个副本分别在broker2,broker0,broker1上,其中第一个是Leader(负责读写),后面两个是follower(负责复制副本)
# Isr: 2,0,1 --> (In Sync Replicas)当前活跃副本
# 查看消费者列表
./bin/kafka-consumer-groups.sh --bootstrap-server 172.25.18.45:9092 --list
# 查看指定消费组消费情况
./bin/kafka-consumer-groups.sh --bootstrap-server 172.25.18.45:9092 --describe --group Group-wangfugui-topic
# 从头开始消费
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.18.45:9092 --topic wangfugui-topic --group wangfugui_group --from-beginning
# 开始订阅并消费 : offset=latest
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.18.45:9092 --topic wangfugui-topic --group wangfugui_group
# 从头开始消费并订阅 : --from-beginning 从头开始消费
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.18.45:9092 --topic wangfugui-topic --group wangfugui_group --from-beginning
# 消费时:打印出消息体的 key 和 value
./bin/kafka-console-consumer.sh --bootstrap-server 172.25.18.45:9092 --property print.key=true --topic wangfugui-topic
# 打开生产数据窗口
./bin/kafka-console-producer.sh --broker-list 172.17.0.3:9092 --topic wangfugui-topic
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。