赞
踩
.
└── docker_kafka
├── docker-compose.yml
├── kafka_python.py
├── conf
└── data
注意:文件内IP替换成自己本机IP
version: '3.5' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" network_mode: "bridge" kafka: image: wurstmeister/kafka ports: - "9092:9092" network_mode: "bridge" environment: KAFKA_ADVERTISED_HOST_NAME: IP KAFKA_ZOOKEEPER_CONNECT: IP:2181 KAFKA_ADVERTISED_PORT: 9092 KAFKA_LOG_RETENTION_HOURS: 120 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 3 KAFKA_DELETE_RETENTION_MS: 1000 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9092 volumes: - ./data:/kafka kafka-manager: image: sheepkiller/kafka-manager environment: ZK_HOSTS: IP ports: - "9001:9000" network_mode: "bridge"
a.安装kafka-python第三方包
pip install kafka-python==2.0.2 -i https://pypi.doubanio.com/simple/
b.kafka_python.py
import json from kafka import KafkaConsumer, TopicPartition k_topic = 'test' consumer = KafkaConsumer(k_topic, bootstrap_servers=['IP:9092'], enable_auto_commit=False, auto_offset_reset="earliest", group_id='test') def get_kafka_left_num(consumer): """ :param consumer: 消费者对象 :return: """ # 查看kafka堆积剩余量 partitions = [TopicPartition(k_topic, p) for p in consumer.partitions_for_topic(k_topic)] # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() print("total offset: {}".format(str(toff))) # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() print("current offset: {}".format(str(coff))) # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum print("kafka left: {}".format(left_sum)) return left_sum # auto_offset_reset # 1. 从上一次未消费的位置开始读(则该参数设置为earliest); # 2. 从当前时刻开始读之后产生的, 之前产生的数据不再消费(则该参数设置为latest) for message in consumer: print(message) print(message.topic) print(f"receive: \n key: {json.loads(message.key.decode())},\n value: {json.loads(message.value.decode())} \n ") # 手动commit # consumer.commit() kafka_left = get_kafka_left_num(consumer) if kafka_left == 0: break
# 进入docker_kafka目录下
cd /系统目录/docker_kafka
# 启动服务
docker compose up -d
a. http://IP:9001/ 进入kafka manager 界面
b.添加kafka
c.填写名称 和 Zookeeper Host后直接往下面拉 点击Save保存
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。