赞
踩
本文主讲述了 消息队列,Kafka的架构,Kafka的相关使用和常用shell命令,Kafka的Python API的操作;
消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程
消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
思考: 公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案
MQ:message queue消息队列
activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群很少了
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源项目,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品
在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server) java消息服务
消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息
在JMS规范中, 专门规定了两种消息消费模型:
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊
Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala
官方地址: http://kafka.apache.org
kafka的特点:
Kafka需要使用到zookeeper服务!
# 三台都需要启动zookeeper服务
[root@node1 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node2 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node3 ~]# /export/server/zookeeper/bin/zkServer.sh start
把ZooInspector.rar解压然后进入ZooInspector\build双击zookeeper-dev-ZooInspector.jar(资源已经上传博客)
HDFS写入过程回顾:
Kafka架构:
1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样 2- Topic:主题/话题,是业务层面对消息进行分类的。 3- 一个Topic可以设置多个Partition分区。 4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数 5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本 6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息 7- Zookeeper用来管理集群,以及管理元数据信息 8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务 相关名词: Kafka Cluster: Kafka集群 Topic: 主题/话题 Broker: Kafka中的节点 Producer: 生产者,负责生产/发送消息到Kafka中 Consumer: 消费者,负责从Kafka中获取消息 Partition: 分区。一个Topic可以设置多个分区,没有数量限制
Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据
注意:
创建topic不指定分区数和副本数,默认都是1个
分区数可以后期通过alter增大,但是不能减小
副本数一旦确定,不能修改!
参数如下:
cd /export/server/kafka/bin
./kafka-topics.sh 参数说明:
--bootstrap-server: Kafka集群中broker服务器
--topic: 指定Topic名称
--partitions: 设置Topic的分区数,可以省略不写
--replication-factor: 设置Topic分区的副本数,可以省略不写
--create: 指定操作类型。这里是新建Topic
--delete: 指定操作类型。这里是删除Topic
--alter: 指定操作类型。这里是修改Topic
--list: 指定操作类型。这里是查看所有Topic列表
--describe: 指定操作类型。这里是查看详细且具体的Topic信息
# 创建topic,默认1个分区,1个副本
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itcast
# 注意: 如果副本数超过了集群broker节点个数,就会报错
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 4
# 把replication-factor改成3以内就能创建成功了
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 3
# --list查看所有topic
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# --describe 可以查看详细Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe
# --describe 可以查看具体Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic itheima
当然也可使用zookeeper客户端查看
# 增大topic分区
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4
# 注意: partitions分区,只能增大,不能减小。而且没有数量限制
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 1
# 注意: 副本既不能增大,也不能减小
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4 --replication-factor 2
# 再创建一个spark主题
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# 删除spark主题
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
消费者要和生产者指定是同一个topic主题,才能接收到消息
参数如下:
cd /export/server/kafka/bin
./kafka-console-producer.sh 参数说明
--broker-list: Kafka集群中broker服务器
--topic: 指定Topic
./kafka-console-consumer.sh 参数说明
--bootstrap-server: Kafka集群中broker连接信息
--topic: 指定Topic
latest: 消费者(默认)从最新的地方开始消费
--from-beginning: 指定该参数以后,会从最旧的地方开始消费
--max-messages: 最多消费的条数。
# 为了方便演示再创建一个spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark
# 模拟生产者给spark发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark
# 模拟消费者从spark获取消息,默认每次拿最新的
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark
# --from-beginning 会从最旧的地方开始消费
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning
# --max-messages x 可以设置从最旧的地方最大消费次数x
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning --max-messages 5
注意:
我们有时候发现消费者打印出来的消息和生产者生产的顺序不一致,是乱序的。原因如下:
topic有多个分区,底层是多线程来读取数据并进行打印输出。因此会存在乱序现象
旧版(<v2.2): kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic ..
注意: 旧版用--zookeeper参数,主机名(或IP)和端口用ZooKeeper的2181,也就是server.properties文件中zookeeper.connect属性的配置值.
新版(>v2.2): kafka-topics.sh --bootstrap-server node1:9092 --create --topic ..
注意: 新版用--bootstrap-server参数,主机名(或IP)和端口用某个节点的即可,即主机名(或主机IP):9092。9092是Kafka的监听端口
broker-list:broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。一般我们再使用console producer的时候,broker-list参数是必备参数,另外一个必备的参数是topic
bootstrap-servers: 指的是kafka集群的服务器地址,这个和broker-list功能是一样的,只不过我们在console producer要求用broker-list,其他地方都采用bootstrap-servers。
可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作,资源包已经上传到博客第10天内
注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!
修改工具的数据显示类型
纯Python的方式操作Kafka。
准备工作:在node1的节点上安装一个python用于操作Kafka的库
安装kafka-python 模模块 ,模块中提供了操作kafka的方法
在线安装
在node1上安装就可以,需要保证服务器能够连接网络
安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
离线安装
将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装
安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
- 1
API使用的参考文档: https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer
模块中封装了两个类,
一个是生成者类KafkaProducer,提供了向kafka写数据的方法
另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法
生成者类KafkaProducer,提供了向kafka写数据的方法
send(topic,valu)方法: 发送消息
topic参数:指定向哪个主题发送消息
value参数:指定发送的消息数据 ,数据类型要求是bytes类型
示例:
# 导包 from kafka import KafkaProducer # 编写代码 if __name__ == '__main__': # 创建生产者对象并指定对应服务器 producer = KafkaProducer(bootstrap_servers=['node1:9092']) # 发送消息 for i in range(1,101): future = producer.send('kafka', f'hi_kafka_{i}'.encode()) # 获取元数据 record_metadata = future.get() # 从元数据中获取主题,分区,偏移 print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset)
消费者类KafkaConsumer,提供了读取kafka数据的方法
KafkaConsumer(topic,bootstrap_servers)
第一个参数:指定消费者连接的主题,
第二个参数:指定消费者连接的kafka服务器
示例:
# 导包 from kafka import KafkaConsumer # 编写代码 if __name__ == '__main__': # 创建消费者对象 consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092']) # 遍历对象 for message in consumer: # 格式化打印,设置相关参数 # 因为value是二进制,需要decode解码 print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
可能遇到的错误:
原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。