赞
踩
使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。
目前,Spark MLlib 中实现了 tree 相关的算法,决策树 DT(DecisionTree),随机森林 RF(Random Forest),GBDT(Gradient Boosting Decision Tree),其基础都是RF,DT 是 RF 一棵树时的情况,而 GBDT 则是循环构建DT,GBDT与DT的代码是非常简单明了的,本文会对 Random Forest 的源码进行分析,介绍 Spark 在实现过程中使用的一些技巧。
首先我们来对决策树和随机森林进行简单的了解:
决策树 GBDT-Decision Tree(DT)
在决策树的训练中,如上图所示,就是从根节点开始,不断的分裂,直到触发截止条件,在节点的分裂过程中要解决的问题其实就两个:
随机森林
随机森林就是构建多棵决策树投票,在构建多棵树过程中,引入随机性,一般体现在两个方面,一是每棵树使用的样本进行随机抽样,分为有放回和无放回抽样。二是对每棵树使用的特征集进行抽样,使用部分特征训练。
在训练过程中,如果单机内存能放下所有样本,可以用多线程同时训练多棵树,树之间的训练互不影响。
随机森林是由多个决策树构成的森林,算法分类结果由决策树的投票结果得到,其属于集成学习中的bagging方法。算法的主要原理如下:
在 Spark 平台上,传统单机形式的迭代方式必须要进行相应改进才能适用于分布式环境,这是因为在分布式环境下,数据也是分布式的,算法设计不得当会生成大量的 IO 操作,影响算法效率三个优化策略。
1.切分点抽样统计
在单机环境下的决策树对连续变量进行切分点选择时,一般是通过对特征点进行排序,然后取相邻两个数之间的点作为切分点,如果在分布式环境下如此操作的话,会带来大量的网络传输操作,特别是当数据量达到 PB 级时,算法效率将极为低下 Spark 中的随机森林在构建决策树时,会对各分区采用一定的子特征策略进行抽样, 然后生成各个分区的统计数据,并最终得到切分点。
2.切分特征装箱(Binning)
决策树的构建过程就是对特征的取值不断进行划分的过程对于离散的特征,如果有M
个值,最多个划分如果值是有序的,那么就最多 M-1
个划分(按老,中,少的序,那么只有 m-1 个,即 2 种划分,老|中,少;老,中|少)
划分的点就是 split
(切分点),划分出的区间就是 bin
。对于连续特征 ,理论上 split 是无数的,在分布环境下不可能取出所有的值,因此它采用的是1中的切点抽样统计方法。
3.逐层训练(level-wise training)
单机版本的决策数生成过程是通过递归调用(本质上是深度优先)的方式构造树,在构造树的同时,需要移动数据,将同一个子节点的数据移动到一起
分布式环境下采用的策略是逐层构建树节点(本质上是广度优先),这样遍历所有数据的次数 等于所有树中的最大层数。每次遍历时,只需要计算每个节点所有切分点统计参数,遍历完后,根据节点的特征划分,决定是否切分,以及如何切分。
数据集
No,year,month,day,hour,pm,DEWP,TEMP,PRES,cbwd,Iws,Is,Ir 1,2010,1,1,0,NaN,-21.0,-11.0,1021.0,NW,1.79,0.0,0.0 2,2010,1,1,1,NaN,-21,-12,1020,NW,4.92,0,0 3,2010,1,1,2,NaN,-21,-11,1019,NW,6.71,0,0 4,2010,1,1,3,NaN,-21,-14,1019,NW,9.84,0,0 5,2010,1,1,4,NaN,-20,-12,1018,NW,12.97,0,0 6,2010,1,1,5,NaN,-19,-10,1017,NW,16.1,0,0 7,2010,1,1,6,NaN,-19,-9,1017,NW,19.23,0,0 8,2010,1,1,7,NaN,-19,-9,1017,NW,21.02,0,0 9,2010,1,1,8,NaN,-19,-9,1017,NW,24.15,0,0 10,2010,1,1,9,NaN,-20,-8,1017,NW,27.28,0,0 11,2010,1,1,10,NaN,-19,-7,1017,NW,31.3,0,0 12,2010,1,1,11,NaN,-18,-5,1017,NW,34.43,0,0 13,2010,1,1,12,NaN,-19,-5,1015,NW,37.56,0,0 14,2010,1,1,13,NaN,-18,-3,1015,NW,40.69,0,0 15,2010,1,1,14,NaN,-18,-2,1014,NW,43.82,0,0 16,2010,1,1,15,NaN,-18,-1,1014,cv,0.89,0,0 ……
1、读取pm.csv,将含有缺失值的行扔掉(或用均值填充)将数据集分为两部分,0.8比例作为训练集,0.2比例作为测试集。
case class data(No: Int, year: Int, month: Int, day: Int, hour: Int, pm: Double, DEWP: Double, TEMP: Double, PRES: Double, cbwd: String, Iws: Double, Is: Double, Ir: Double)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("foreast")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
Logger.getLogger("org").setLevel(Level.ERROR)
val root = MyRandomForeast.getClass.getResource("/")
import sqlContext.implicits._
val df = sc.textFile(root + "pm.csv").map(_.split(",")).filter(_ (0) != "No").filter(!_.contains("NaN")).map(x => data(x(0).toInt, x(1).toInt, x(2).toInt, x(3).toInt, x(4).toInt, x(5).toDouble, x(6).toDouble, x(7).toDouble, x(8).toDouble, x(9), x(10).toDouble, x(11).toDouble, x(12).toDouble)).toDF.drop("No").drop("year")
2、使用 month、day、hour、DEWP、TEMP、PRES、cbwd、Iws、Is、Ir 作为特征列(除去No,year,pm),pm作为label列,使用训练集、随机森林算法进行回归建模,使用回归模型对测试集进行预测,并评估。
val splitdf = df.randomSplit(Array(0.8, 0.2)) val (train, test) = (splitdf(0), splitdf(1)) val traindf = train.withColumnRenamed("pm", "label") val indexer = new StringIndexer().setInputCol("cbwd").setOutputCol("cbwd_") val assembler = new VectorAssembler().setInputCols(Array("month", "day", "hour", "DEWP", "TEMP", "PRES", "cbwd_", "Iws", "Is", "Ir")).setOutputCol("features") import org.apache.spark.ml.Pipeline val rf = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("features") // setMaxDepth最大20,会大幅增加计算时间,增大能有效减小根均方误差 // setMaxBins我觉得和数据量相关,单独增大适得其反,要和setNumTrees一起增大 // 目前这个参数得到的评估结果(根均方误差)46.0002637676162 // val numClasses=2 // val categoricalFeaturesInfo = Map[Int, Int]()// Empty categoricalFeaturesInfo indicates all features are continuous. val pipeline = new Pipeline().setStages(Array(indexer, assembler, rf)) val model = pipeline.fit(traindf) val testdf = test.withColumnRenamed("pm", "label") val labelsAndPredictions = model.transform(testdf) labelsAndPredictions.select("prediction", "label", "features").show(false) val eva = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction") val accuracy = eva.evaluate(labelsAndPredictions) println(accuracy) val treemodel = model.stages(2).asInstanceOf[RandomForestRegressionModel] println("Learned regression forest model:\n" + treemodel.toDebugString)
3、按照下面标准处理pm列,数字结果放进(levelNum)列,字符串结果放进(levelStr)列
优(0) 50
良(1)50~100
轻度污染(2) 100~150
中度污染(3) 150~200
重度污染(4) 200~300
严重污染(5) 大于300及以上
// 使用Bucketizer特征转换,按区间划分
val splits = Array(Double.NegativeInfinity, 50, 100, 150, 200, 300, Double.PositiveInfinity)
val bucketizer = new Bucketizer().setInputCol("pm").setOutputCol("levelNum").setSplits(splits)
val bucketizerdf = bucketizer.transform(df)
val tempdf = sqlContext.createDataFrame(
Seq((0.0, "优"), (1.0, "良"), (2.0, "轻度污染"), (3.0, "中度污染"), (4.0, "重度污染"), (5.0, "严重污染"))
).toDF("levelNum", "levelStr")
val df2 = bucketizerdf.join(tempdf, "levelNum").drop("pm")
df2 show
4、使用month、day、hour、DEWP、TEMP、PRES、cbwd、Iws、Is、Ir作为特征列(除去No,year,pm),levelNum作为label列,使用训练集、随机森林算法进行分类建模。使用分类模型对测试集进行预测对预测结果df进行处理,基于prediction列生成predictionStr(0-5转换优-严重污染),对结果进行评估。
val splitdf2 = df2.randomSplit(Array(0.8, 0.2)) val (train2, test2) = (splitdf2(0), splitdf2(1)) val traindf2 = train2.withColumnRenamed("levelNum", "label") val indexer2 = new StringIndexer().setInputCol("cbwd").setOutputCol("cbwd_") val assembler2 = new VectorAssembler().setInputCols(Array("month", "day", "hour", "DEWP", "TEMP", "PRES", "cbwd_", "Iws", "Is", "Ir")).setOutputCol("features") val rf2 = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features") val pipeline2 = new Pipeline().setStages(Array(indexer2, assembler2, rf2)) val model2 = pipeline2.fit(traindf2) val testdf2 = test2.withColumnRenamed("levelNum", "label") val labelsAndPredictions2 = model2.transform(testdf2) labelsAndPredictions2.select("label", "prediction", "features").show val eva2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision") val accuracy2 = eva2.evaluate(labelsAndPredictions2) println("Test Error = " + (1.0 - accuracy2)) val treeModel2 = model2.stages(2).asInstanceOf[RandomForestClassificationModel] println("learned classifier tree node:\n" + treeModel2.toDebugString)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。