赞
踩
今天正式进入机器学习相关算法类的记录学习。后续会将spark的mllib工具包里面设计到的回归于分类算法介绍一遍,看一下mllib包可以发现,传统的机器学习算法基本上都包括在里面,比如线性回归,logistic回归,贝叶斯分类,svm,决策树,随机森林等。在往上走更牛逼的分类算法像xgboost就不在原始spark自带的mllib里面了,但是肯定是有一些集成的外部工具包可以在spark里面一起使用,还往上走比如深度学习,自然也不再mllib里面,同样也是有对应的工具包也可以集合到spark一起使用,深度学习本身已经有好多有些的开源框架了,只是可能并非正对大数据领域的,比如最典型的tensorflow,如果把tensorflow和结合到spark里面,就构成了面向大数据分布式计算的深度学习框架了,你别说还真有这样的框架,这就是yahoo开源的TensorflowOnSpark。
牛逼的算法很多,然而企业里面其实很多问题mllib自带的经典机器学习算法已经可以搞定了,我们知道特征是非常重要的,算法反而其次,所以企业很多实际项目,反而变成了特征工程的事,实在不行了,再上更厉害的算法。
今天从最简单的线性回归说起,在spark里面应该说是分布式的线性回归,当然分布式已经有spark去管了,我们主要管一下线性回归。
回归问题其实就是求解一堆自变量与因变量之间一种几何关系,这种关系可以是线性的就是线性回归,可以是非线性的就是非线性回归。按照自变量的多少有可以分为一元线性回归,多元线性回归。
最简单的莫过于一元线性回归了,简单的表达式也就是y=a*x+b。比如身高和体重的关系,实际中,有一对这样的(身高,体重)数据,也就是(x,y),需要求解的是系数a,b,进而得到一元线性回归表达式。所以回归分析的整个过程大抵如下:
说到这里,那么第二个重点来了,也就是如何得到最优的a与b,这也是机器学习领域最为核心的需要研究的东西。
也许你已经知道,梯度下降算法(GD)是优化领域最为重要的方法之一,在它基础上演变出来了随机梯度下降算法(SGD),批量梯度下降算法,牛顿法,拟牛顿法等等,它们都是为了解决一个问题,优化目标函数,得到给定条件下的最优参数。
说到这里,机器学习领域另一个核心问题就是目标函数了。我们只知道优化,梯度下降,那么优化的目标是什么呢,这就是目标函数。还是以上述的身高体重的一元回归为例,怎样才能得到最终的参数a,b呢?首先需要定义目标函数。
想一下什么情况下我们认为得到了好的a,b,假设我们有了一组a,b,那么对于某一个身高x1,因为我们又了回归函数y=a*x+b,那么我们可以计算出来预测的体重回归值,y’ = a * x1 + b, 由于我们的训练样本已知了真实的体重y,那么很自然,加入y’ 与y很接近,我们就有理由相信,我们的这组参数a,b很好,如果训练样本中所有的样本都符合这个条件,那么就可以认为我们优化得到了最优的a,b参数。这么一看我们就可以知道,预测值与真实值之间的误差是衡量模型好坏的至关重要的判别标准。
那么如何数字化表示预测值与真实值之间的误差呢?均方误差是很常见的表示方法,也就是(y-y’)^2。优化的目标就是使得所有训练样本的均方误差和最小。那么如何优化这样一个目标呢,这就是前面提到的一系列梯度下降算法了。关于梯度下降的细节原理本篇暂不提及,主要介绍spark里面的应用技巧。当然机器学习的理论知识还是非常非常重要的,理解了原理才能更好的从原理层面去准备数据,处理特征,曾经我们在学校的时候,主要是学习理论知识,透彻研究里面的数学原理,这是理论储备阶段,到了企业,有了实战的环境,需要学习的是开始掌握各种处理框架,也就是工具,有了工具技能+数据环境+知识储备,方能形成完美的闭环。
说远了,继续回来看mllib里面线性回归的使用方法。这里用到了线性回归里面的包:LinearRegressionWithSGD。这个函数里面,默认优化的方法是SGD,似乎mllib里面好多算法底层的优化都是SGD。优化的目标函数是均方误差和最小化。
这里采用mllib的示例为例,假设有如下数据:
-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
-0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.8544153,-0.557312518810673 -0.208756571683607 -0.787896192088153 0.990146852537193 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.2669476,-0.929360463147704 -0.0578991819441687 0.152317365781542 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
1.2669476,-2.28833047634983 -0.0706369432557794 -0.116315079324086 0.80409888772376 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
...
从数据的格式可以看到,第一个数是y,逗号后面是8个数,也就是8个自变量,现在想要求这样一组线性回归问题。问题分为以下几步:
完整代码如下:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LinearRegressionModel import org.apache.spark.mllib.regression.LinearRegressionWithSGD object LinearRegressionWithSGDExample { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("LinearRegressionWithSGDExample") val sc = new SparkContext(conf) // 加载库自带的数据 val data = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/data/mllib/ridge-data/lpsa.data") //准备数据,按逗号分隔,按空格分隔 //变成LabeledPoint格式数据 val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() // 设置模型参数调用模型 val numIterations = 100 val stepSize = 0.1 val model = LinearRegressionWithSGD.train(parsedData, numIterations, stepSize) // 评估模型:得到预测值 val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } //计算均方差 val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2) }.mean() println(s"training Mean Squared Error $MSE") sc.stop() } }
得到的结果就是:
>>> training Mean Squared Error 7.4510328101026015
解释一下几个细节:
scala> model.
intercept predict save toPMML toString weights
将权重weights打印出来:
scala> model.weights
res1: org.apache.spark.mllib.linalg.Vector =
[0.0019227869553792723,
0.00142724649860849,
0.0012831307611090802,
6.082768997250542E-4,
0.0016317241683978026,
0.001188345706511071,
7.456678158134278E-4,
0.0016360475842244853]
数一下发现权重只有8个,也就是没有常数项数值,要不要常数项都可以,y=ax+b其实也可以写长y-b=ax也就是另外一种的y=ax了。
这就是简单的线性回归模型。
关注公号【AInewworld】,第一时间获取精彩内容
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。