赞
踩
目录
Linux系统命令使用、了解如何安装Python库、安装kafka。
熟悉Linux基本操作、Pycharm的安装、Spark安装,Kafka安装
至于如何安装好spark,我这里就不详细介绍了,请点击标题,即可跳转到文章详情页,里面有spark的安装资料和教程。
点击此处下载,下载kafka_2.11-2.4.0.tgz。此安装包内已经附带zookeeper,不需要额外安装zookeeper.按顺序执行如下步骤:
首先将下载好的安装包放在我们虚拟机里面(Ubuntu)
使用命令进行解压
sudo tar -zxf /home/hadoop/kafka/kafka_2.11-2.4.0.tgz -C /home/hadoop/kafka
解压成功之后,需要我们对其进行改名,方便我们后续的操作
cd /home/hadoop/kafka
sudo mv kafka_2.11-2.4.0/ kafka
下面介绍Kafka相关概念,以便运行下面实例的同时,更好地理解Kafka.
1. Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
2. Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
3. Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
4. Producer
负责发布消息到Kafka broker
5. Consumer
消息消费者,向Kafka broker读取消息的客户端。
6. Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
接下来在Ubuntu系统环境下测试简单的实例。Mac系统请自己按照安装的位置,切换到相应的指令。按顺序执行如下命令:
进入kafka所在的目录
cd /home/hadoop/kafka/kafka
输入该命令
bin/zookeeper-server-start.sh config/zookeeper.properties
命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:
cd /home/hadoop/kafka/kafka
bin/kafka-server-start.sh config/server.properties
kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令(测试):
cd /home/hadoop/kafka/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
注意上面的步骤顺序缺一不可:初学者,千万要记住,先启动zookeeper,再启动kafka,这个很重要,不然会出错,切记!!!
topic是发布消息发布的category,以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在结果中查看到dblab这个topic存在
接下来用producer生产点数据:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
并尝试输入如下信息:
然后再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据,输入如下命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning
便可以看到刚才产生的信息。说明kafka安装成功!!!
本项目主要使用了两个Python库,Flask和Flask-SocketIO,这两个库的安装非常简单,请启动进入Ubuntu系统,打开一个命令行终端(可以使用快捷键Ctrl+Alt+T)。
Python之所以强大,其中一个原因是其丰富的第三方库。pip则是python第三方库的包管理工具。Python3对应的包管理工具是pip3。因此,需要首先在Ubuntu系统中安装pip3,命令如下
sudo apt-get install python3-pip
安装完pip3以后,可以使用如下Shell命令完成Flask和Flask-SocketIO这两个Python第三方库的安装以及与Kafka相关的Python库的安装:
- pip3 install flask
-
- pip3 install flask-socketio
-
- pip3 install kafka-python
这些安装好的库在我们的程序文件的开头可以直接用来引用。比如下面的例子。
- from flask import Flask
- from flask_socketio import SocketIO
- from kafka import KafkaConsumer
from import 跟直接import的区别举个例子来说明。
import socket的话,要用socket.AF_INET,因为AF_INET这个值在socket的名称空间下。
from socket import* 是把socket下的所有名字引入当前名称空间。
但是对于本次项目,我们使用的是pycharm开发工具,所以可以不用这样,我们直接使用anaconda里面的安装命令,这样更加的快捷。
pycharm的详解安装步骤,在之前就已经介绍的非常详细了,这里只需要点击标题即可
搭建成功我们就可以把我们的项目引入进来
首先利用pycharm,我们要安装第三方库
pip --default-timeout=100 install kafka -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
安装其他的第三方库,反正没有的都可以自己安装!
pip install flask_socketio
这里先给出本项目Python工程的目录结构,后续的操作可以根据这个目录进行操作
Python工程目录结构
至此,本项目需要的开发环境及搭建就介绍完毕!
数据集介绍
本项目的数据集压缩包为data数据集,有需要的可以在评论区留言QQ邮箱:456789321@qq.com
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个项目中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:
用户行为日志user_log.csv,日志中的字段定义如下:
1. user_id | 买家id
2. item_id | 商品id
3. cat_id | 商品类别id
4. merchant_id | 卖家id
5. brand_id | 品牌id
6. month | 交易时间:月
7. day | 交易事件:日
8. action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
9. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
11. province| 收获地址省份
数据具体格式如下:
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,844400,1271,2882,2661,08,29,0,1,1,山西
这个项目实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。
数据预处理
本项目使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库,请在Ubuntu中打开一个命令行终端,执行如下Shell命令来安装Python操作Kafka的代码库(备注:如果之前已经安装过,则这里不需要安装):
注意:
在运行项目之前,首先要保证你的项目代码里面的第三方库是否已经全部安装完毕,如果没有,可以参考上面的步骤完成
其次在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka(分开执行,按照顺序,注意在开启kafka之前)
初学者,千万要记住,先启动zookeeper,再启动kafka,这个很重要,不然会出错,切记!!!
- cd /home/hadoop/kafka/kafka
-
- bin/zookeeper-server-start.sh config/zookeeper.properties
-
- bin/kafka-server-start.sh config/server.properties
producer.py
- # coding: utf-8
- from kafka import KafkaProducer
- import csv
- import time
-
- producer = KafkaProducer(bootstrap_servers='localhost:9092')
- csvfile = open("../data/user_log.csv","r")
- reader = csv.reader(csvfile)
-
- for line in reader:
- gender = line[9]
- if gender == 'gender':
- continue
- print(line[9])
- time.sleep(0.1)
- producer.send('sex',line[9].encode('utf8'))
上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’
consumer.py
- from kafka import KafkaConsumer
-
- consumer = KafkaConsumer('result')
- for msg in consumer:
- print((msg.value).decode('utf8'))
-
运行首先要运行producer.py,然后去运行consumer.py才可以正常展示和输出
如果报错:
报错原因:3.8版本中,async已经变成了关键字,所以导致不兼容
解决方案:执行 pip install kafka-python,就可以解决
pip install kafka-python
运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:
如果有上述的输出,恭喜你,Python操作Kafka运行成功。接下来,第三部分将分析Spark Streaming如何处理Kafka的实时数据。
Spark Streaming实时处理Kafka数据;
将处理后的结果发送给Kafka;
本项目在于实时统计每秒中男女生购物人数,而Spark Streaming接收的数据为1,1,0,2…,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。
因此利用Spark Streaming接口reduceByKeyAndWindow,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。
首先下载Spark连接Kafka的代码库。然后把下载的代码库放到目录
首先将:spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar这个文件直接复制粘贴在:/home/hadoop/spark/jars
然后在/home/hadoop/spark/jars目录下新建kafka目录,把/home/hadoop/kafka/kafka/libs下所有函数库复制到/home/hadoop/spark/jars/kafka目录下,命令如下:
cd /home/hadoop/spark/jars
mkdir kafka
cd kafka
cp /home/hadoop/kafka/kafka/libs/* .
然后,修改 Spark 配置文件,命令如下
cd /home/hadoop/spark/conf
vim spark-env.sh
把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh,修改后的 spark-env.sh 类似如下:
export SPARK_DIST_CLASSPATH=$classpath:/home/hadoop/spark/jars/kafka/*:/home/hadoop/kafka/kafka/libs/*
这就配置好了相关的参数
kafka_test.py
- #!/home/hadoop/anaconda3/bin/python
- from kafka import KafkaProducer
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.kafka import KafkaUtils
- from pyspark import SparkConf, SparkContext
- import json
- import sys
-
-
- def KafkaWordCount(zkQuorum, group, topics, numThreads):
- spark_conf = SparkConf().setAppName("KafkaWordCount").set('spark.io.compresssion.codec', 'snappy')
- sc = SparkContext(conf=spark_conf)
- sc.setLogLevel("ERROR")
- ssc = StreamingContext(sc, 1)
- ssc.checkpoint(".")
- # 这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop
- # ssc.checkpoint(".")
- topicAry = topics.split(",")
- # 将topic转换为hashmap形式,而python中字典就是一种hashmap
- topicMap = {}
- for topic in topicAry:
- topicMap[topic] = numThreads
- lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
- words = lines.flatMap(lambda x : x.split(" "))
- wordcount = words.map(lambda x : (x, 1)).reduceByKeyAndWindow((lambda x,y : x+y), (lambda x,y : x-y), 1, 1, 1)
- wordcount.foreachRDD(lambda x : sendmsg(x))
- ssc.start()
- ssc.awaitTermination()
-
-
- # 格式转化,将[["1", 3], ["0", 4], ["2", 3]]变为[{'1': 3}, {'0': 4}, {'2': 3}],这样就不用修改第四个教程的代码了
- def Get_dic(rdd_list):
- res = []
- for elm in rdd_list:
- tmp = {elm[0]: elm[1]}
- res.append(tmp)
- return json.dumps(res)
-
-
- def sendmsg(rdd):
- if rdd.count != 0:
- msg = Get_dic(rdd.collect())
- # 实例化一个KafkaProducer示例,用于向Kafka投递消息
- producer = KafkaProducer(bootstrap_servers='localhost:9092')
- producer.send("result", msg.encode('utf8'))
- # 很重要,不然不会更新
- producer.flush()
-
-
- if __name__ == '__main__':
- # 输入的四个参数分别代表着
- # 1.zkQuorum为zookeeper地址
- # 2.group为消费者所在的组
- # 3.topics该消费者所消费的topics
- # 4.numThreads开启消费topic线程的个数
- if (len(sys.argv) < 5):
- print("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
- exit(1)
- zkQuorum = sys.argv[1]
- group = sys.argv[2]
- topics = sys.argv[3]
- numThreads = int(sys.argv[4])
- print(group, topics)
- KafkaWordCount(zkQuorum, group, topics, numThreads)
上述代码注释已经也很清楚了,下面在简要说明下:
1. 首先按每秒的频率读取Kafka消息;
2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
3. 最后将上述结果封装成json发送给Kafka。
另外,需要注意,上面代码中有一行如下代码:
ssc.checkpoint(".")
这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:
cd /home/hadoop/hadoop
./sbin/start-dfs.sh
新建一个项目
cd /home/hadoop/spark
mkdir mycode
cp /home/hadoop/PycharmProjects/First/labproject/kafka_test.py /home/hadoop/spark/mycode
把这个加入到我们执行文件里面
/home/hadoop/spark/bin/spark-submit /home/hadoop/spark/mycode/kafka_test.py localhost:2181 1 sex 1
按照我们最初的想法,我们直接使用执行命令就可以执行了
./startup.sh
殊不知,就这样一步一步的走向深渊.......
下面是解决方法
1.首先我们发现执行之后,报错找不到这个文件路径,或者找不到这个文件,不存在这个文件
使用权限加入:chmod 777 startup.sh 或者 chmod +x startup.sh 给我们的执行文件加入可行性权限
2.接下来它依然报错,说无法找到,为什么呢?
注意要给你的Python加上可执行环境,我是使用的anaconda编译环境,anaconda比较的方便,推荐使用
sudo update-alternatives --install /usr/bin/python python /home/hadoop/anaconda3/bin/python 4
3.版本不兼容导致的问题
根据报错的信息我们可以得出,我们的spark里面的有一个文件和我们之前加入的一个文件包有冲突,所以我们的解决方法是在删除这个包(net)
其他报错可以自己参考网络解法,有一个小小的建议,遇到报错之后,很多人都喜欢直接复制报错信息提交给百度君,但是!
不建议这样,因为每一步的过程可能别人和你不一样,或者你们的环境也不同,最正确的解决方法是,你自己阅读报错信息,安装报错来解决,可以参考CSDN里面解决方法。
再次执行
执行OK!到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。
做好了充分的准备工作,直接可以贴代码运行了!
web展示数据
数据是动态的,不断产生,因此利用Flask-SocketIO实时推送数据 socket.io.js实时获取数据 highlights.js展示数据
目录结构:
kafka-exp
├── app.py
├── static
│ └── js
│ ├── exporting.js
│ ├── highcharts.js
│ ├── jquery-3.1.1.min.js
│ ├── socket.io.js
│ └── socket.io.js.map
└── templates
└── index.html
- import json
- from flask import Flask, render_template
- from flask_socketio import SocketIO
- from kafka import KafkaConsumer
- #因为第一步骤安装好了flask,所以这里可以引用
-
- app = Flask(__name__)
- app.config['SECRET_KEY'] = 'secret!'
- socketio = SocketIO(app)
- thread = None
- # 实例化一个consumer,接收topic为result的消息
- consumer = KafkaConsumer('result')
-
- # 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
- def background_thread():
- girl = 0
- boy = 0
- for msg in consumer:
- data_json = msg.value.decode('utf8')
- data_list = json.loads(data_json)
- for data in data_list:
- if '0' in data.keys():
- girl = data['0']
- elif '1' in data.keys():
- boy = data['1']
- else:
- continue
- result = str(girl) + ',' + str(boy)
- print(result)
- socketio.emit('test_message',{'data':result})
- socketio.sleep(1)
-
-
- # 客户端发送connect事件时的处理函数
- @socketio.on('test_connect')
- def connect(message):
- print(message)
- global thread
- if thread is None:
- # 单独开启一个线程给客户端发送数据
- thread = socketio.start_background_task(target=background_thread)
- socketio.emit('connected', {'data': 'Connected'})
-
- # 通过访问http://127.0.0.1:5000/访问index.html
- @app.route("/")
- def handle_mes():
- return render_template("index.html")
-
- # main函数
- if __name__ == '__main__':
- socketio.run(app,debug=True)
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>DashBoard</title>
- <script src="static/js/socket.io.js"></script>
- <script src="static/js/jquery-3.1.1.min.js"></script>
- <script src="static/js/highcharts.js"></script>
- <script src="static/js/exporting.js"></script>
- <script type="text/javascript" charset="utf-8">
- var socket = io.connect('http://' + document.domain + ':' + location.port);
- socket.on('connect', function() {
- socket.emit('test_connect', {data: 'I\'m connected!'});
- });
-
- socket.on('test_message',function(message){
- console.log(message);
- var obj = eval(message);
- var result = obj["data"].split(",");
- $('#girl').html(result[0]);
- $('#boy').html(result[1]);
- });
-
- socket.on('connected',function(){
- console.log('connected');
- });
-
- socket.on('disconnect', function () {
- console.log('disconnect');
- });
- </script>
- </head>
- <body>
- <div>
- <b>Girl: </b><b id="girl"></b>
- <b>Boy: </b><b id="boy"></b>
- </div>
- <div id="container" style="width: 600px;height:400px;"></div>
-
- <script type="text/javascript">
- $(document).ready(function () {
- Highcharts.setOptions({
- global: {
- useUTC: false
- }
- });
-
- Highcharts.chart('container', {
- chart: {
- type: 'spline',
- animation: Highcharts.svg, // don't animate in old IE
- marginRight: 10,
- events: {
- load: function () {
-
- // set up the updating of the chart each second
- var series1 = this.series[0];
- var series2 = this.series[1];
- setInterval(function () {
- var x = (new Date()).getTime(), // current time
- count1 = $('#girl').text();
- y = parseInt(count1);
- series1.addPoint([x, y], true, true);
-
- count2 = $('#boy').text();
- z = parseInt(count2);
- series2.addPoint([x, z], true, true);
- }, 1000);
- }
- }
- },
- title: {
- text: '男女生购物人数实时分析'
- },
- xAxis: {
- type: 'datetime',
- tickPixelInterval: 50
- },
- yAxis: {
- title: {
- text: '数量'
- },
- plotLines: [{
- value: 0,
- width: 1,
- color: '#808080'
- }]
- },
- tooltip: {
- formatter: function () {
- return '<b>' + this.series.name + '</b><br/>' +
- Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
- Highcharts.numberFormat(this.y, 2);
- }
- },
- legend: {
- enabled: true
- },
- exporting: {
- enabled: true
- },
- series: [{
- name: '女生购物人数',
- data: (function () {
- // generate an array of random data
- var data = [],
- time = (new Date()).getTime(),
- i;
-
- for (i = -19; i <= 0; i += 1) {
- data.push({
- x: time + i * 1000,
- y: Math.random()
- });
- }
- return data;
- }())
- },
- {
- name: '男生购物人数',
- data: (function () {
- // generate an array of random data
- var data = [],
- time = (new Date()).getTime(),
- i;
-
- for (i = -19; i <= 0; i += 1) {
- data.push({
- x: time + i * 1000,
- y: Math.random()
- });
- }
- return data;
- }())
- }]
- });
- });
- </script>
- </body>
- </html>
依次运行(保证之前的服务全部开启)
在spark里面使用Python对大数据进行实时展示,是当今互联网技术的革新和必然发展,无论是淘宝、京东、拼多多还是其他各类的电商,他们都会使用这项技术,未来Python和hadoop/spark将会在大数据的时代,创造出更多未知的惊喜和迎接新的挑战!
眼下即最好,未来方可期!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。