赞
踩
阅读前请参考:
本篇内容主要是:对 test.csv 和 train.csv 数据集进行操作(分析)。并将最终结果通过 Spark 存储进 MySQL 。
操作流程如下:
文字描述:
在数据准备篇中,已经将数据压缩包解压到 Linux中了。然后,将其中的test.csv 和 train.csv文件上传到 HDFS 上。使用 Spark-shell (或在 windows环境下创建一个项目,最终打包上传运行一条龙)进行数据分析,计算出“回头客”,再将最终数据存储到 MySQL 中(需要设计表)。
字段含义请见: 淘宝双11大数据分析(数据准备篇)
把这test.csv数据集里label字段表示-1值剔除掉,保留需要预测的数据.并假设需要预测的数据中label字段均为1。
先查看内容:
[root@centos2020 dataset]# pwd
/usr/taobao_data/dataset
[root@centos2020 dataset]# head -5 test.csv
user_id,age_range,gender,merchant_id,label
163968,0,0,4378,-1
163968,0,0,2300,-1
163968,0,0,1551,-1
163968,0,0,4343,-1
这里可以使用 Spark-Shell 来完成,当然,我这里的处理是用的 Shell 脚本:
[root@centos2020 taobao_data]# vim predeal_test.sh
脚本的内容是:
#!/bin/bash #下面设置输入文件,把用户执行predeal_test.sh命令时提供的第一个参数作为输入文件名称 infile=$1 #下面设置输出文件,把用户执行predeal_test.sh命令时提供的第二个参数作为输出文件名称 outfile=$2 #注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面 awk -F "," 'BEGIN{ id=0; } { if($1 && $2 && $3 && $4 && !$5){ id=id+1; print $1","$2","$3","$4","1 if(id==10000){ exit } } }' $infile > $outfile
授权并执行脚本:(处理test.csv,得到test_after.csv 文件)
[root@centos2020 taobao_data]# chmod +x predeal_test.sh
[root@centos2020 taobao_data]# ./predeal_test.sh ./dataset/test.csv ./dataset/test_after.csv
结果(发现最后一个字段 label 已经全是 1 了):
[root@centos2020 dataset]# head -5 test_after.csv
360576,2,2,1581,1
295296,2,1,3361,1
230016,5,1,1742,1
164736,3,1,598,1
164736,3,1,1963,1
先查看内容:
[root@centos2020 dataset]# head -5 train.csv
user_id,age_range,gender,merchant_id,label
34176,6,0,944,-1
34176,6,0,412,-1
34176,6,0,1945,-1
34176,6,0,4752,-1
剔除掉第一行的字段信息(查看结果,已经没有字段信息了):
[root@centos2020 dataset]# sed -i '1d' train.csv
[root@centos2020 dataset]# head -5 train.csv
34176,6,0,944,-1
34176,6,0,412,-1
34176,6,0,1945,-1
34176,6,0,4752,-1
34176,6,0,643,-1
然后,需要剔除掉 train,csv 中字段值部分字段值为空的数据。
仍旧使用 Shell 脚本:
[root@centos2020 taobao_data]# vim predeal_train.sh
脚本的内容是:
#!/bin/bash #下面设置输入文件,把用户执行predeal_train.sh命令时提供的第一个参数作为输入文件名称 infile=$1 #下面设置输出文件,把用户执行predeal_train.sh命令时提供的第二个参数作为输出文件名称 outfile=$2 #注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面 awk -F "," 'BEGIN{ id=0; } { if($1 && $2 && $3 && $4 && ($5!=-1)){ id=id+1; print $1","$2","$3","$4","$5 if(id==10000){ exit } } }' $infile > $outfile
授权并执行脚本:
[root@centos2020 taobao_data]# chmod +x ./predeal_train.sh
[root@centos2020 taobao_data]# ./predeal_train.sh ./dataset/train.csv ./dataset/train_after.csv
前边的文件预处理,将一些无用的数据去除掉了。这里就能直接进行操作,而不用过多的数据清洗操作。
确保 Hadoop 已经启动。
[root@centos2020 hadoop-2.7.7]# bin/hadoop fs -put /usr/taobao_data/dataset/test_after.csv /taobao_data/dataset/
[root@centos2020 hadoop-2.7.7]# bin/hadoop fs -put /usr/taobao_data/dataset/train_after.csv /taobao_data/dataset/
可视化查看上传结果:
首先启动服务:
[root@centos2020 /]# service mysql start
Redirecting to /bin/systemctl start mysql.service
然后在 dbtaobao
这个数据库中建表:
mysql> use dbtaobao;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> create table rebuy (score varchar(40),label varchar(40));
Query OK, 0 rows affected (0.22 sec)
首先找到 mysql 驱动的位置:
刚好想到之前安装 Sqoop 时,上传过一个驱动,因此:
[root@centos2020 lib]# pwd
/usr/sqoop/sqoop-1.4.7.bin__hadoop-2.6.0/lib
[root@centos2020 lib]# cp mysql-connector-java-5.1.48.jar /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/
使用mysql的驱动是因为,后边处理的数据要通过 Spark 存储到 MySQL 中。
启动:
[root@centos2020 spark-2.4.4-bin-hadoop2.7]# bin/spark-shell --jars /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/mysql-connector-java-5.1.48.jar --driver-class-path /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/mysql-connector-java-5.1.48.jar 20/02/27 00:37:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://centos2020:4040 Spark context available as 'sc' (master = local[*], app id = local-1582781875708). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161) Type in expressions to have them evaluated. Type :help for more information.
在命令行中输入代码。
首先是导入要用的包:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
spark 处理数据的过程:
读取文件
val train_data = sc.textFile("/taobao_data/dataset/train_after.csv")
val test_data = sc.textFile("/taobao_data/dataset/test_after.csv")
构建模型
用 map 将每行的数据用逗号隔开,在数据集中,每行被分成 5 个部分:age_range,gender,merchant_id,label
。用LabeledPoint
来存储标签列和特征列(LabeledPoint
在监督学习中常用来存储标签和特征,其中要求标签的类型是double
,特征的类型是Vector
)。
val train= train_data.map{line =>
val parts = line.split(',')
LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts
(2).toDouble,parts(3).toDouble))
}
val test = test_data.map{line =>
val parts = line.split(',')
LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts(2).toDouble,parts(3).toDouble))
}
接下来,通过训练集构建模型SVMWithSGD
。这里的SGD即著名的随机梯度下降算法(Stochastic Gradient Descent
)。设置迭代次数为1000
,除此之外还有stepSize
(迭代步伐大小),regParam
(regularization
正则化控制参数),miniBatchFraction
(每次迭代参与计算的样本比例),initialWeights
(weight
向量初始值)等参数可以进行设置。
val numIterations = 1000
val model = SVMWithSGD.train(train, numIterations)
接下来,我们清除默认阈值,这样会输出原始的预测评分,即带有确信度的结果。
model.clearThreshold()
val scoreAndLabels = test.map{point =>
val score = model.predict(point.features)
score+" "+point.label
}
scoreAndLabels.foreach(println)
打印结果是(篇幅原因,取了最后几条结果展示):
-10797.000398878788 1.0
-14567.652050327359 1.0
-29610.775392755688 1.0
-14646.618577059266 1.0
-20963.52471155082 1.0
-9355.853170556047 1.0
如果我们设定了阀值,则会把大于阈值的结果当成正预测,小于阈值的结果当成负预测。
model.setThreshold(0.0)
scoreAndLabels.foreach(println)
打印结果是(篇幅原因,取了最后几条结果展示):
0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
把没有设定阀值的测试集结果存入到MySQL数据中
model.clearThreshold() val scoreAndLabels = test.map{point => val score = model.predict(point.features) score+" "+point.label } //设置回头客数据 val rebuyRDD = scoreAndLabels.map(_.split(" ")) //下面要设置模式信息 val schema = StructType(List(StructField("score", StringType, true),StructField("label", StringType, true))) //下面创建Row对象,每个Row对象都是rowRDD中的一行 val rowRDD = rebuyRDD.map(p => Row(p(0).trim, p(1).trim)) //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 val rebuyDF = spark.createDataFrame(rowRDD, schema) //下面创建一个prop变量用来保存JDBC连接参数 val prop = new Properties() prop.put("user", "root") //表示用户名是root prop.put("password", "root") //表示密码是root prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver //下面就可以连接数据库,采用append模式,表示追加记录到数据库dbtaobao的rebuy表中 rebuyDF.write.mode("append").jdbc("jdbc:mysql://centos2020:3306/dbtaobao", "dbtaobao.rebuy", prop)
mysql> mysql> select * from rebuy;
...
...
| -10797.000398878788 | 1.0 |
| -14567.652050327359 | 1.0 |
| -29610.775392755688 | 1.0 |
| -14646.618577059266 | 1.0 |
| -20963.52471155082 | 1.0 |
| -9355.853170556047 | 1.0 |
+---------------------+-------+
10000 rows in set (0.02 sec)
发现,结果已经存储进去了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。