赞
踩
一、实验环境
(1)Linux: Ubuntu 16.04
(2)Python: 3.5
(3)Hadoop:3.1.3(4)Spark: 2.4.0(5)Web框架:flask 1.0.3
(6)可视化工具:Echarts
(7)开发工具:Visual Studio Code
二、小组成员及分工
(1)成员:林海滢,王惠玲,陈嘉怡,郭诗念
(2)分工:xxx负责xxxx部分,xxx负责xxxx部分,xxx负责xxxx部分。
三、数据采集
3.1数据集说明
爬取网站:http://search.dangdang.com/?key=java,是当当网的java图书的信息网站。数据文件:java_books.xlsx。其中包含了1661条与java图书信息有关的数据。
数据格式为:
图 3. 1 采集数据格式
数据中包含的内容如下:
(1)book_name: 图书的标题
(2)introduction:图书的简介
(3)author: 图书的作者
(4)price: 图书的价格(元/本)
(5)press: 图书出版社
(6)comment: 图书的评论
3.2.爬取数据集以及将其保存到本地D盘文件中的流程
(1)选取所需要爬取的页面进行遍历爬取
(2)通过正则表达式抓取所需要的数据
(3)将爬取出的数据转化为dataframe格式并保存为xlsx文件存放在D盘
四、数据清洗与预处理
4.1预处理中提取的数据
图 4.1 数据处理前的数据格式及存在问题的特征列
4.2 清洗预处理后的数据格式
图 4.2 数据处理后的数据格式及特征列
4.3 清洗与预处理的流程
(1)首先检查数据的结构以及是否有数据缺失。
(2)发现book_name特征列的数据格式不对,于是处理转换为了整数类型。
(3)发现price特征列的数据格式不对,于是处理转换为了浮点类型。
(4)内容简介列数据清洗 删除异常值。
(5)保存清洗与预处理后的数据集。
五、spark数据分析
5.1 数据分析目标
(1)图书的售价分布情况(观察图书价格大体集中在哪个分段得出图书价格趋势)
(2)部分图书出版社的出书数量统计
(3)图书的作者出书(观察哪个作者出的书最多)
(4)图书的评论分布情况(观察图书评论大体集中在哪个分段得出图书评论趋势)
(5)图书的部分作者数量统计
(6)分析价格的最大值、最小值、均值、方差和中位数
六、数据可视化
本实验的可视化基于mutplotlib实现。
6.1.可视化环境
利用和anaconda里面的jubiter和vscode进行可视化操作,最后的代码结构如下。
6.2 图表展示与结论分析
(1)图书的售价分布情况(观察图书价格大体集中在哪个分段得出图书价格趋势)
图6.2.1图书的售价分布情况
分析结论:通过这个柱状图可以看出图书售卖价格集中在2060这里。说明了大多数人购书倾向于中端价格。比如2040这里,售价比较便宜图书的销量就会多。而6080这里的价格上升了购买的人就相对少了,销量也随之减少。我们也可以从中得出2060的销量有1841,而20一下和60以上的销量有1159。所以大胆推测出我国中层收入人数是低高层收入人数的1.6倍左右。
(2)部分图书出版社的出书数量统计
图6.2.2部分图书出版社的出书数量统计
(3)图书的作者出书(观察哪个作者出的书最多)
(4)图书的评论分布情况(观察图书评论大体集中在哪个分段得出图书评论趋势)
图6.2.4图书的评论分布情况
分析结论:通过这个图我们可以看出92%的图书评论都是在0~100之间。也就是说92%的人不爱对图书做出评论,其余少部分人会对图书做出评论。所以我们可以大胆推测现在大多数人都不爱对看过的书发布之间的看法。
(5)图书的部分作者数量统计进行数据可视化图表分析
图6.2.5部分作者数量统计
分析结论:通过这个图我们可以看出我们找出来的部分作者54%左右的作者写的书都在65本以下(这就与第五部分的数据分析相对应),也侧面说明了这个数据里面50%左右的作者写的java的书不多即50%以下的作者可能不是专门做java这个领域的,可能还包括别的领域,大多数关于java的图书都是专攻这个专业的组织和作者写的。
(5)分析价格的最大值、最小值、均值、方差和中位数
图6.2.6分析价格的最大值、最小值、均值、方差和中位数
分析结论:通过这个图我们可以看出图书价格的中位数和均值在55块钱左右,方差在50左右,证明了图书的价格波动不是很大,从最大值、最小值中可以看出最便宜的图书是10元左右,最贵的图书是120元左右,所以表明了买一本java图书总体的均价为55元左右。
附录A(代码):
#清洗代码: # coding=utf-8 # 数据清洗 import os import numpy as np import pandas as pd import csv df = pd.read_csv("java_books.csv") df['book_name']=df['book_name'].str.strip()#去掉book_name列所有值前后的空格 df['author']=df['author'].str.strip()#去掉price列所有值前后的空格 df['introduction']=df['introduction'].astype(str)#把book_name列所有值转化为字符串类型 df.dropna(axis=0,how='any',thresh=None,subset=None,inplace=True) df['price'].astype(float) # 内容简介列数据清洗 删除异常值 df.drop(df[df['introduction'].str.contains('内容简介')].index,inplace=True) # 输出清理完的文件 df.to_csv(r'D:/data/clean_ganji_rent1.csv',encoding='utf-8',index=False) 处理代码+spark分析代码: (1)图书的售价分布情况(观察图书价格大体集中在哪个分段得出图书价格趋势) import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.5" from pyspark.sql import SparkSession import pandas as pd xlsx = '/home/guosn/data/java_books.xlsx' csv = '/home/guosn/data/java_books.csv' def save_xlsx_to_text(): """ 使用pandas将xlsx文件转换为csv文件 供spark读取 """ df = pd.read_excel(xlsx) df.to_csv(csv,index=False) def price_distribution(spark): """计算价格区间分布""" df = spark.read.csv(csv,header=True) df.printSchema() df.createOrReplaceTempView("books") # 为每条数据打上所属价格区间标识 price_sql = """ select case when (price < 20.0) then '0-20' when (price >=20.0) and (price < 40.0) then '20-40' when (price >=40.0) and price <60.0 then '40-60' when price >=60.0 and price <80.0 then '60-80' when price >=80.0 and price <100.0 then '80-100' else 'greater than 100' end as price_range, books.* from books """ price_result = spark.sql(price_sql) price_result.createOrReplaceTempView("price_results") price_result.show() # 按照价格区间进行分组,计算各组内数据总数 group_price_sql = 'select price_range, count(*) from price_results group by price_range' group_price_results = spark.sql(group_price_sql) group_price_results.show() def comment_distribution(spark): """计算价格区间分布""" df = spark.read.csv(csv,header=True) df.printSchema() df.createOrReplaceTempView("books") # 为每条数据打上所属价格区间标识 comment_sql = """ select case when (comment < 100) then '0-100' when (comment >=100) and (comment < 200.0) then '100-200' when (comment >=200) and (comment <400) then '200-400' when (comment >=400) and (comment <600) then '400-600' when comment >=600 and comment <800 then '600-800' when comment >=800 and comment <1000 then '800-1000' else 'greater than 10000' end as comment_range, books.* from books """ comment_result = spark.sql(comment_sql) comment_result.createOrReplaceTempView("comment_results") comment_result.show() # 按照价格区间进行分组,计算各组内数据总数 group_comment_sql = 'select comment_range, count(*) from comment_results group by comment_range' group_comment_results = spark.sql(group_comment_sql) group_comment_results.show() if __name__ == '__main__': sc = SparkSession.builder.appName("Python Spark SQL basic example").master('local') .getOrCreate() # 具体执行方法 save_xlsx_to_text() price_distribution(sc) comment_distribution(sc) sc.stop() (2)部分图书出版社的出书数量统计 import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"]='/usr/bin/python3.5' from pyspark import SparkConf,SparkContext from pyspark.sql.session import SparkSession from pyspark import SparkContext from pyspark.sql.types import Row import pandas as pd import numpy as np # xlsx = '/home/hadoop/data/java_books.xlsx' csv = '/home/hadoop/data/java_books.csv' def save_xlsx_to_text(): """ 使用pandas将xlsx文件转换为csv文件 供spark读取 """ df = pd.read_excel(xlsx) df.to_csv(csv,index=False) if __name__=='__main__': conf = SparkConf().setAppName("IpSearch").setMaster("local[2]") sc = SparkContext.getOrCreate(conf) spark = SparkSession.builder.getOrCreate() sc.setLogLevel("WARN") # data = spark.read.csv(csv, header=True) data=sc.textFile("file:///home/hadoop/data/java_books .csv") data=data.map(lambda line: line.split(",")) datardd=data.map(lambda local:Row(press=local[4])) print(datardd.take(3)) zhaopin = spark.createDataFrame(datardd) zhaopin.createOrReplaceTempView("zhaopin") # datardd = sc.textFile("file:///home/hadoop/data/employee.txt") # employee = spark.createDataFrame(datardd) # employee.createOrReplaceTempView("employee") # loc = spark.sql("select count(*) as count from employee group by location order by count desc") siming = spark.sql("select count(*) as simingcount from (select press from zhaopin where press Like '%机械工业%' )") print(siming.rdd.map(lambda p: '机械工业出版社:' + str(p.simingcount)).collect()) jimei = spark.sql("select count(*) as jimei from (select press from zhaopin where press like '%清华大学%' )") print(jimei.rdd.map(lambda p: '清华大学出版社:' + str(p.jimei)).collect()) huli = spark.sql("select count(*) as huli from (select press from zhaopin where press like '%电子工业%')") print(huli.rdd.map(lambda p: '电子工业出版社:' + str(p.huli)).collect()) haicang = spark.sql("select count(*) as haicang from (select press from zhaopin where press like '%中国铁道%' )") print(haicang.rdd.map(lambda p: '中国铁道出版社:' + str(p.haicang)).collect()) tongan = spark.sql("select count(*) as tongan from (select press from zhaopin where press like '%人民邮电%')") print(tongan.rdd.map(lambda p: '人民邮电出版社:' + str(p.tongan)).collect()) xiangan = spark.sql("select count(*) as xiangan from (select press from zhaopin where press like '%东南大学%')") print(xiangan.rdd.map(lambda p: '东南大学出版社:' + str(p.xiangan)).collect()) (3)图书的作者出书(观察哪个作者出的书最多) (4)图书的评论分布情况(观察图书评论大体集中在哪个分段得出图书评论趋势) def comment_distribution(spark): """计算价格区间分布""" df = spark.read.csv(csv,header=True) df.printSchema() df.createOrReplaceTempView("books") # 为每条数据打上所属价格区间标识 comment_sql = """ select case when (comment < 100) then '0-100' when (comment >=100) and (comment < 200.0) then '100-200' when (comment >=200) and (comment <400) then '200-400' when (comment >=400) and (comment <600) then '400-600' when comment >=600 and comment <800 then '600-800' when comment >=800 and comment <1000 then '800-1000' else 'greater than 10000' end as comment_range, books.* from books """ comment_result = spark.sql(comment_sql) comment_result.createOrReplaceTempView("comment_results") comment_result.show() # 按照价格区间进行分组,计算各组内数据总数 group_comment_sql = 'select comment_range, count(*) from comment_results group by comment_range' group_comment_results = spark.sql(group_comment_sql) group_comment_results.show() if __name__ == '__main__': sc = SparkSession.builder.appName("Python Spark SQL basic example").master('local') .getOrCreate() # 具体执行方法 save_xlsx_to_text() price_distribution(sc) comment_distribution(sc) sc.stop() (5)图书的部分作者数量统计 import os os.environ['JAVA_HOME']='/usr/lib/jvm/jdk1.8.0_162' os.environ["PYSPARK_PYTHON"]='/usr/bin/python3.5' from pyspark import SparkConf,SparkContext from pyspark.sql.session import SparkSession from pyspark import SparkContext from pyspark.sql.types import Row import pandas as pd import numpy as np xlsx = '/home/hadoop/data/java_books.xlsx' csv = '/home/hadoop/data/java_books.csv' def save_xlsx_to_text(): df = pd.read_excel(xlsx) df.to_csv(csv,index=False) if __name__=='__main__': conf = SparkConf().setAppName("IpSearch").setMaster("local[2]") sc = SparkContext.getOrCreate(conf) spark = SparkSession.builder.getOrCreate() sc.setLogLevel("WARN") # data = spark.read.csv(csv, header=True) data=sc.textFile("file:///home/hadoop/data/java_books .csv") data=data.map(lambda line: line.split(",")) datardd=data.map(lambda local:Row(press=local[2])) print(datardd.take(3)) zhaopin = spark.createDataFrame(datardd) zhaopin.createOrReplaceTempView("zhaopin") siming = spark.sql("select count(*) as simingcount from (select press from zhaopin where press Like '%明日科技%' )") print(siming.rdd.map(lambda p: '明日科技:' + str(p.simingcount)).collect()) jimei = spark.sql("select count(*) as jimei from (select press from zhaopin where press like '%霍斯特曼%' )") print(jimei.rdd.map(lambda p: '霍斯特曼:' + str(p.jimei)).collect()) huli = spark.sql("select count(*) as huli from (select press from zhaopin where press like '%传智播客高教产品研发部%')") print(huli.rdd.map(lambda p: '传智播客高教产品研发部:' + str(p.huli)).collect()) haicang = spark.sql("select count(*) as haicang from (select press from zhaopin where press like '%丁振凡%' )") print(haicang.rdd.map(lambda p: '丁振凡:' + str(p.haicang)).collect()) tongan = spark.sql("select count(*) as tongan from (select press from zhaopin where press like '%高洪岩%')") print(tongan.rdd.map(lambda p: '高洪岩:' + str(p.tongan)).collect()) xiangan = spark.sql("select count(*) as xiangan from (select press from zhaopin where press like '%李兴华%')") print(xiangan.rdd.map(lambda p: '李兴华:' + str(p.xiangan)).collect()) quanzhou = spark.sql("select count(*) as quanzhou from (select press from zhaopin where press like '%软件开发技术联盟%')") print(quanzhou.rdd.map(lambda p: '软件开发技术联盟:' + str(p.quanzhou)).collect()) nanan = spark.sql("select count(*) as nanan from (select press from zhaopin where press like '%其他作者%')") print(nanan.rdd.map(lambda p: '其他作者:' + str(p.nanan)).collect()) (6)分析价格的最大值、最小值、均值、方差和中位数 import os os.environ['JAVA_HOME']='/usr/lib/jvm/jdk1.8.0_162' os.environ["PYSPARK_PYTHON"]='/usr/bin/python3.5' import pandas as pd from pyspark.sql import SparkSession csv = '/home/hadoop/data/java_books .csv' # SparkSession 配置 spark = SparkSession.builder .appName("My test") .getOrCreate() # spark.conf.set("spark.executor.memory", "1g") spark.conf.set("spark.sql.execution.arrow.enabled", "true") sc = spark.sparkContext sc.setLogLevel("WARN") air = spark.read.csv(csv,header=True) air.printSchema() air.describe('price').show() 可视化代码: (1)图书的售价分布情况(观察图书价格大体集中在哪个分段得出图书价格趋势) from pyecharts.charts import Bar from pyecharts import options as opts #//设置行名 columns = ["0-20", "20-40", "40-60", "60-80", "80-100", "more than 100"] #//设置数据 data1 = [282, 1057, 784, 407, 204, 266] #//添加柱状图的配置项 bar=( Bar() .add_xaxis(xaxis_data=columns) .add_yaxis("数量", y_axis=data1) .set_global_opts(title_opts=opts.TitleOpts(title="图书的售价分布情况")) ) bar.render() (2)部分图书出版社的出书数量统计 from pyecharts.charts import Bar from pyecharts import options as opts #//设置行名 columns = ["清华大学出版社", "机械工业出版社", "电子工业出版社", "人民邮电出版社", "中国铁道出版社", "东南大学出版社"] #//设置数据 data1 = [565, 440, 387, 282, 64, 40] #//添加柱状图的配置项 bar=( Bar() .add_xaxis(xaxis_data=columns) .add_yaxis("数量", y_axis=data1) .set_global_opts(title_opts=opts.TitleOpts(title="图书的出版社出书情况")) ) bar.render() (3)图书的作者出书(观察哪个作者出的书最多) (4)图书的评论分布情况(观察图书评论大体集中在哪个分段得出图书评论趋势) from pyecharts.charts import Pie from pyecharts import options as opts columns = ["0-100", "100-200", "200-400", "400-600", "600-800", "800-1000", "more than 1000"] #//设置数据 data1 = [2756, 61, 46,22 , 15, 14, 86] pie = ( Pie() # 以[(lable,value),(lable,value),(lable,value)......]形式传入数据。 .add("饼图", list(z for z in zip(columns, data1))) .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")) .set_global_opts(title_opts=opts.TitleOpts(title="评论分布情况")) ) pie.render('饼图.html') (5)图书的部分作者数量统计 import numpy as np import matplotlib.pyplot as plt import pandas as pd plt.rcParams['font.sans-serif'] = 'SimHei' # 设置中文显示,否则可能无法显示中文或者是各种字符错乱 plt.rcParams['axes.unicode_minus'] = False size = [72,65,19,20,23,26,24,4] labels = ['明日科技', '霍斯特曼', '传智播客高教产品研发部','丁振凡','高洪岩','李兴华','软件开发技术联盟','其他作者'] explode = (0.1,0,0,0,0,0,0,0) plt.pie(size,explode=explode,labels=labels,autopct='%1.1f%%',shadow=False,startangle=150) plt.title("饼图示例--部分作者数量统计") plt.show() (6)分析价格的最大值、最小值、均值、方差和中位数 import numpy as np import pandas as pd import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = 'SimHei' # 设置中文显示,否则可能无法显示中文或者是各种字符错乱 plt.rcParams['axes.unicode_minus'] = False data = pd.read_csv("new/books.csv") # 加载数据 必须加上 values = data['price'] plt.figure(figsize=(6, 6)) plt.ylim(0,180) label = ['图书价格箱线图'] # gdp = list(values[:, 3]) plt.boxplot(values, notch=True, labels=label, meanline=True) # 第一个参数是数据,第二个参数是是否带有缺口,第三个参数是标签,第四个参数是是否带有均值线 plt.title('图书价格箱线图') plt.savefig('new/图书价格箱线图箱线图.png') plt.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。