当前位置:   article > 正文

淘宝双11大数据分析(Spark 分析篇)_1. 将没有设定阈值的测试集结果存入到mysql数据中,查询数据库dbtaobao的rebuy表的

1. 将没有设定阈值的测试集结果存入到mysql数据中,查询数据库dbtaobao的rebuy表的

前言

阅读前请参考:

淘宝双11大数据分析(环境篇)

淘宝双11大数据分析(数据准备篇)

本篇内容主要是:对 test.csv 和 train.csv 数据集进行操作(分析)。并将最终结果通过 Spark 存储进 MySQL 。

操作流程如下:
在这里插入图片描述

文字描述:

在数据准备篇中,已经将数据压缩包解压到 Linux中了。然后,将其中的test.csv 和 train.csv文件上传到 HDFS 上。使用 Spark-shell (或在 windows环境下创建一个项目,最终打包上传运行一条龙)进行数据分析,计算出“回头客”,再将最终数据存储到 MySQL 中(需要设计表)。

test.csv 和 train.csv 数据预处理

字段含义请见: 淘宝双11大数据分析(数据准备篇)

test.csv 文件的处理

把这test.csv数据集里label字段表示-1值剔除掉,保留需要预测的数据.并假设需要预测的数据中label字段均为1。

先查看内容:

[root@centos2020 dataset]# pwd
/usr/taobao_data/dataset
[root@centos2020 dataset]# head -5 test.csv 
user_id,age_range,gender,merchant_id,label
163968,0,0,4378,-1
163968,0,0,2300,-1
163968,0,0,1551,-1
163968,0,0,4343,-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这里可以使用 Spark-Shell 来完成,当然,我这里的处理是用的 Shell 脚本:

[root@centos2020 taobao_data]# vim predeal_test.sh
  • 1

脚本的内容是:

#!/bin/bash
#下面设置输入文件,把用户执行predeal_test.sh命令时提供的第一个参数作为输入文件名称
infile=$1
#下面设置输出文件,把用户执行predeal_test.sh命令时提供的第二个参数作为输出文件名称
outfile=$2
#注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
      id=0;
    }
    {
        if($1 && $2 && $3 && $4 && !$5){
            id=id+1;
            print $1","$2","$3","$4","1
            if(id==10000){
                exit
            }
        }
    }' $infile > $outfile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

授权并执行脚本:(处理test.csv,得到test_after.csv 文件)

[root@centos2020 taobao_data]# chmod +x predeal_test.sh 
[root@centos2020 taobao_data]# ./predeal_test.sh ./dataset/test.csv ./dataset/test_after.csv
  • 1
  • 2

结果(发现最后一个字段 label 已经全是 1 了):

[root@centos2020 dataset]# head -5 test_after.csv 
360576,2,2,1581,1
295296,2,1,3361,1
230016,5,1,1742,1
164736,3,1,598,1
164736,3,1,1963,1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

train.csv 文件的处理

先查看内容:

[root@centos2020 dataset]# head -5 train.csv 
user_id,age_range,gender,merchant_id,label
34176,6,0,944,-1
34176,6,0,412,-1
34176,6,0,1945,-1
34176,6,0,4752,-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

剔除掉第一行的字段信息(查看结果,已经没有字段信息了):

[root@centos2020 dataset]# sed -i '1d' train.csv 
[root@centos2020 dataset]# head -5 train.csv 
34176,6,0,944,-1
34176,6,0,412,-1
34176,6,0,1945,-1
34176,6,0,4752,-1
34176,6,0,643,-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

然后,需要剔除掉 train,csv 中字段值部分字段值为空的数据。

仍旧使用 Shell 脚本:

[root@centos2020 taobao_data]# vim predeal_train.sh
  • 1

脚本的内容是:

#!/bin/bash
#下面设置输入文件,把用户执行predeal_train.sh命令时提供的第一个参数作为输入文件名称
infile=$1
#下面设置输出文件,把用户执行predeal_train.sh命令时提供的第二个参数作为输出文件名称
outfile=$2
#注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
         id=0;
    }
    {
        if($1 && $2 && $3 && $4 && ($5!=-1)){
            id=id+1;
            print $1","$2","$3","$4","$5
            if(id==10000){
                exit
            }
        }
    }' $infile > $outfile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

授权并执行脚本:

[root@centos2020 taobao_data]# chmod +x ./predeal_train.sh 
[root@centos2020 taobao_data]# ./predeal_train.sh ./dataset/train.csv ./dataset/train_after.csv
  • 1
  • 2

Spark 处理数据

前边的文件预处理,将一些无用的数据去除掉了。这里就能直接进行操作,而不用过多的数据清洗操作。

执行环境

确保 Hadoop 已经启动。

上传文件到HDFS

[root@centos2020 hadoop-2.7.7]# bin/hadoop fs -put /usr/taobao_data/dataset/test_after.csv /taobao_data/dataset/
[root@centos2020 hadoop-2.7.7]# bin/hadoop fs -put /usr/taobao_data/dataset/train_after.csv /taobao_data/dataset/
  • 1
  • 2

可视化查看上传结果:

在这里插入图片描述

MySQL 的准备工作

首先启动服务:

[root@centos2020 /]# service mysql start
Redirecting to /bin/systemctl start mysql.service
  • 1
  • 2

然后在 dbtaobao 这个数据库中建表:

mysql> use dbtaobao;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> create table rebuy (score varchar(40),label varchar(40));
Query OK, 0 rows affected (0.22 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

启动 Spark Shell

首先找到 mysql 驱动的位置:

刚好想到之前安装 Sqoop 时,上传过一个驱动,因此:

[root@centos2020 lib]# pwd
/usr/sqoop/sqoop-1.4.7.bin__hadoop-2.6.0/lib
[root@centos2020 lib]# cp mysql-connector-java-5.1.48.jar /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/
  • 1
  • 2
  • 3

使用mysql的驱动是因为,后边处理的数据要通过 Spark 存储到 MySQL 中。

启动:

[root@centos2020 spark-2.4.4-bin-hadoop2.7]# bin/spark-shell --jars /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/mysql-connector-java-5.1.48.jar --driver-class-path /usr/spark/spark-2.4.4-bin-hadoop2.7/jars/mysql-connector-java-5.1.48.jar 
20/02/27 00:37:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://centos2020:4040
Spark context available as 'sc' (master = local[*], app id = local-1582781875708).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

支持向量机SVM分类器预测回头客

在命令行中输入代码。

首先是导入要用的包:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

spark 处理数据的过程:

读取文件

val train_data = sc.textFile("/taobao_data/dataset/train_after.csv")
val test_data = sc.textFile("/taobao_data/dataset/test_after.csv")
  • 1
  • 2

构建模型

用 map 将每行的数据用逗号隔开,在数据集中,每行被分成 5 个部分:age_range,gender,merchant_id,label。用LabeledPoint来存储标签列和特征列(LabeledPoint在监督学习中常用来存储标签和特征,其中要求标签的类型是double,特征的类型是Vector)。

val train= train_data.map{line =>
  val parts = line.split(',')
  LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts
(2).toDouble,parts(3).toDouble))
}
val test = test_data.map{line =>
  val parts = line.split(',')
  LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts(2).toDouble,parts(3).toDouble))
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

接下来,通过训练集构建模型SVMWithSGD。这里的SGD即著名的随机梯度下降算法(Stochastic Gradient Descent)。设置迭代次数为1000,除此之外还有stepSize(迭代步伐大小),regParamregularization正则化控制参数),miniBatchFraction(每次迭代参与计算的样本比例),initialWeightsweight向量初始值)等参数可以进行设置。

val numIterations = 1000
val model = SVMWithSGD.train(train, numIterations)
  • 1
  • 2

接下来,我们清除默认阈值,这样会输出原始的预测评分,即带有确信度的结果。

model.clearThreshold()
val scoreAndLabels = test.map{point =>
  val score = model.predict(point.features)
  score+" "+point.label
}
scoreAndLabels.foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

打印结果是(篇幅原因,取了最后几条结果展示):

-10797.000398878788 1.0
-14567.652050327359 1.0
-29610.775392755688 1.0
-14646.618577059266 1.0
-20963.52471155082 1.0
-9355.853170556047 1.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果我们设定了阀值,则会把大于阈值的结果当成正预测,小于阈值的结果当成负预测。

model.setThreshold(0.0)
scoreAndLabels.foreach(println)
  • 1
  • 2

打印结果是(篇幅原因,取了最后几条结果展示):

0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
0.0 1.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

输出结果到 mysql 中

把没有设定阀值的测试集结果存入到MySQL数据中

model.clearThreshold()
val scoreAndLabels = test.map{point =>
  val score = model.predict(point.features)
  score+" "+point.label
}
//设置回头客数据
val rebuyRDD = scoreAndLabels.map(_.split(" "))
//下面要设置模式信息
val schema = StructType(List(StructField("score", StringType, true),StructField("label", StringType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = rebuyRDD.map(p => Row(p(0).trim, p(1).trim))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val rebuyDF = spark.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "root") //表示密码是root
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
//下面就可以连接数据库,采用append模式,表示追加记录到数据库dbtaobao的rebuy表中
rebuyDF.write.mode("append").jdbc("jdbc:mysql://centos2020:3306/dbtaobao", "dbtaobao.rebuy", prop)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

mysql 确认数据已经存储进去

mysql> mysql> select * from rebuy;
...
...
| -10797.000398878788 | 1.0   |
| -14567.652050327359 | 1.0   |
| -29610.775392755688 | 1.0   |
| -14646.618577059266 | 1.0   |
| -20963.52471155082  | 1.0   |
| -9355.853170556047  | 1.0   |
+---------------------+-------+
10000 rows in set (0.02 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

发现,结果已经存储进去了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/777759
推荐阅读
相关标签
  

闽ICP备14008679号