赞
踩
打开cmd然后执行zkserver 命令
打开命令提示窗口(win+R,输入cmd,进入),进入E:\kafka\kafka37文件内输入并执行以下命令打开kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
打开命令提示窗口,进入E:\kafka\kafka37\bin\windows文件内
创建topics,可以把topic理解为文件夹,partition为topic下面的子文件夹,log在partition下,而消息保存在log中
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1
查看topics,这里查看一下是否创建成功
kafka-topics --list --bootstrap-server localhost:9092
查看 topic 的详细信息,partition 数量和副本数量等
kafka-topics --bootstrap-server localhost:9092 --describe --topic test1
修改当前topic分区(只能增加分区,如果减少会报错)
kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic test1 --partitions 4
查看修改后的分区
kafka-topics --bootstrap-server localhost:9092 --describe --topic test1
查看消息组消息队列
kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --all-groups
目标:读取student表的数据内容,将其转为JSON格式,发送给Kafka
向student表中插入两条记录的SQL语句如下:
insert into student values(‘95002’,’Fopn’,’M’,22);
insert into student values(‘95003’,’Tom’,’M’,23);
编写一个生产者程序mysql_producer.py
from kafka import KafkaProducer import json import pymysql.cursors producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8')) connect=pymysql.Connect( host='localhost', port=3306, user='root', passwd='123456', db='test', charset='utf8' ) cursor=connect.cursor() sql="select sno,sname,ssex,sage from student;" cursor.execute(sql) data = cursor.fetchall() connect.commit() for msg in data: res={} res['sno']=msg[0] res['name']=msg[1] res['sex']=msg[2] res['age']=msg[3] producer.send('mysql_topic', res) connect.close() producer.close()
再从Kafka中获取到JSON格式数据,打印出来;
编写一个消费者程序mysql_consumer.py
from kafka import KafkaConsumer
import json
import pymysql.cursors
consumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:
msg1=str(msg.value,encoding="utf-8")
data=json.loads(msg1)
print(data)
data.json文件
[{"sno": "95001", "name": "John1", "sex": "M", "age": 23},
{"sno": "95002", "name": "John2", "sex": "M", "age": 23},
{"sno": "95003", "name": "John3", "sex": "M", "age": 23}]
根据上面给出的data.json文件,执行如下操作:
编写生产者程序,将json文件数据发送给Kafka;
编写一个生产者程序commit_producer.py
from kafka import KafkaProducer import json # 引入模块 # 打开一个json文件 data = open("./data.json") # 转换为python对象 strJson = json.load(data) producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8')) producer.send('json_topic', strJson) producer.close() 编写消费者程序,读取前面发送的数据,并手动提交偏移量; 编写一个消费者程序commit_consumer.py from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata import json class Consumer(): def __init__(self): self.server = 'localhost:9092' self.topic = 'json_topic' self.consumer = None self.tp = None self.consumer_timeout_ms = 5000 self.group_id = 'test1' def get_connect(self): self.consumer = KafkaConsumer('json_topic',group_id=self.group_id,auto_offset_reset='earliest',bootstrap_servers=self.server,enable_auto_commit=False,consumer_timeout_ms=self.consumer_timeout_ms) def beginConsumer(self): now_offset = 0 while True: for message in self.consumer: now_offset = message.offset data = message.value.decode('utf-8') data = json.loads(data) print(data) self.consumer.commit() consumer.close() c = Consumer() c.get_connect() c.beginConsumer()
手动创建主题“assign_topic”,分区数量2
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic assign_topic
(1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题assign_topic;
编写一个生产者程序assign_producer.py:
from kafka import KafkaProducer,TopicPartition import time import uuid producer = KafkaProducer(bootstrap_servers='localhost:9092') display_interval = 5 print('Producing messages to topic assign_topic. Press Ctrl-C to interrupt.') display_iteration = 0 message_count = 0 start_time = time.time() while True: identifier = str(uuid.uuid4()) producer.send('assign_topic', identifier.encode('utf-8')) message_count += 1 now = time.time() if now - start_time > display_interval: print('No.%i iter %i messages produced at %.0f messages / second' % ( display_iteration, message_count, message_count / (now - start_time))) display_iteration += 1 message_count = 0 start_time = time.time()
(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据;
编写第一个消费者程序assgin_consumer1.py:
from kafka import KafkaConsumer,TopicPartition import time import uuid display_interval = 5 consumer1 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest') consumer1.assign([TopicPartition('assign_topic', 0)]) print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.') display_iteration = 0 message_count = 0 partitions = set() start_time = time.time() while True: message = next(consumer1) identifier = str(message.value,encoding="utf-8") message_count += 1 partitions.add(message.partition) now = time.time() if now - start_time > display_interval: print('No.%i %i messages consumed at %.0f messages / second - from partitions %r' % ( display_iteration, message_count, message_count / (now - start_time), sorted(partitions))) display_iteration += 1 message_count = 0 partitions = set() start_time = time.time()
(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据;
编写第二个消费者程序assgin_consumer2.py:
from kafka import KafkaConsumer,TopicPartition import time import uuid display_interval = 5 consumer2 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest') consumer2.assign([TopicPartition('assign_topic', 1)]) print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.') display_iteration = 0 message_count = 0 partitions = set() start_time = time.time() while True: message = next(consumer2) identifier = str(message.value,encoding="utf-8") message_count += 1 partitions.add(message.partition) now = time.time() if now - start_time > display_interval: print('No.%i %i messages consumed at %.0f messages / second - from partitions %r' % ( display_iteration, message_count, message_count / (now - start_time), sorted(partitions))) display_iteration += 1 message_count = 0 partitions = set() start_time = time.time()
.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat
注意:在运行实验期间,1,2打开的窗口不要关闭
Kafka在大数据生态系统中的作用?
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。