赞
踩
MLlib的官网文档:
http://spark.apache.org/docs/latest/ml-guide.html
本节主要内容:
一、MLlib简述
二、基本数据类型
三、汇总统计
四、实例应用K-means算法
MLlib是Spark的机器学习(ML)库。它的目标是让实用的机器学习变得可扩展和容易。在高层次上,它提供以下工具:
(1)ML算法:常用的学习算法,如分类、回归、聚类和协同过滤等
(2)特性分析:特征提取、变换、降维和选择
(3)管道:用于构造、评估和调优ML管道的工具
(4)持久性:保存和加载算法、模型和管道
(5)实用性:线性代数,统计,数据处理等
随着Spark2.0版本,基于RDD的MLlib已经进入“维护模式”,现在Spark中主要用于机器学习的包是ML包,ML包是基于DataFrame的API(这个包将逐渐取代MLlib)。
关于ML包将会在后面总结。
这里给出一个向量示例:
(1,6,0,0,0,0,7,1,0,0,1,0)
引入下面的包:
import org.apache.spark.mllib.linalg.{Vector,Vectors}
稠密向量将原封不动的将上面的向量保存下来
val v0:Vector = Vectors.dense(1.0,6.0,0.0,0.0,0.0,0.0)
而稀疏向量会记录这个向量的长度,向量内非0元素的索引(位置),向量内非0元素的值
val v1:Vector = Vectors.sparse(6, Array(0,1), Array(1.0, 6.0))
也可以使用下面的形式:长度 + 索引,值的序列
val v2:Vector = Vector.sparse(6,Seq((0,1.0),(1,6.0)))
注意:在大数据分析中,稀疏数据会非常常见,用稀疏向量或矩阵进行模型的训练效率会比稠密矩阵存储的效率更高,也更加节省时间。所以尽量使用稀疏矩阵来进行模型的训练。
本质上是Label + Vector(在使用MLlib算法时,数据必须是LabelPoint类型)
例如,给上面的向量加上标签变为labelpoint即为:
- import org.apache.s.park.mllib.regression.LabeledPoint
-
- val posi=LabeledPoint(1.0,Vectors.sparse(6, Array(0,1), Array(1.0, 6.0)))
引入:import org.apache.spark.mllib.linalg.{Matrix, Matrices}
来看一下官网给出的例子:
(1)稠密矩阵存储方式:
((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
(2)对于稀疏矩阵的存储方式:
((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
注意:这里有必要对稀疏矩阵的存储进行一定的说明
((9.0, 0.0),
(0.0, 8.0),
(0.0, 6.0))
在sparse中有3个数组作为参数:其实这些数组的数都是为了确定矩阵中的非0值的位置
①先看后两个
Array(0, 2, 1):指非0元素出现的行索引,对应Array(9,6,8),就是9在第0行,6在第二行...
②然后看第一个Array(0,1,3)
0是不变的,1指第一列出现了1个非0元素,3指第一列和第二列一共出现了3个非0元素。
这样计数的目的为了:通过第一列出现了1个非0元素,加上第二个列表中对行索引,就可以确定第三个列表中第一个值在矩阵中的位置,然后通过第一个列表的3-1就可以知道第二列有2个非0元素,加上第二个列表第2,3个元素对行索引的描述,确定第三个列表中6,8对应在矩阵中的位置,以此类推,就可以用这种方法来表示矩阵啦。
分布式矩阵具有长类型的行和列索引以及双类型的值,分布存储在一个或多个rdd中。选择合适的格式来存储大型分布式矩阵是非常重要的。将分布式矩阵转换为不同的格式可能需要全局转移,这是非常昂贵的。目前已经实现了四种类型的分布式矩阵。
具体示例去官网查看。
在这里注明一下:
在计算矩阵奇异值、矩阵乘法等算法中,要求输入IndexedRowMatrix
BlockMatrix由于拆分比较方便,利于进行分布式矩阵计算,如块运算。
对RDD[Vector]进行基本的统计分析(mean,min,max,etc.)
研究变量之间线性相关程度的量。
现在Spark支持两种相关性系数:pearson相关系数和Spearman等级相关系数
适用条件:
Pearson 相关性:连续数据,正态分布, 线性关系,用pearson相关系数是最恰当 ;上述任一条件不满足,就用spearman相关系数
Spearman 相关性:两个定序测量数据 之间也用spearman相关系数,不能用 pearson相关系数
将数据根据不同的特征分成不同的组,然后按特定条件从不同的组中获取样本,并重新组成新的数组。Spark RDD api 中提供两种方式。
sampleByKey 和 sampleByKeyExact
两者的区别:
①sampleByKey 每次都通过给定的概率以一种类似于 掷硬币的方式来决定这个观察值是否被放入样本,因此一遍就可以过滤完所有数据,最后得到一个近似大小的样本,但往往不够准确。
②sampleByKeyExtra 会对全量数据做采样计算。对于每个类别,其都会产生 (fk⋅nk)个样本,其中 fk是键为k的样本类别采样的比例;nk是键k所拥有的样本数。 sampleByKeyExtra 采样的结果会更准确,有99.99%的置信度,但耗费的计算资源也更多。
是数理统计学中根据一定假设条件由样本推断总体的一种方法
(1).卡方检验(Stratified Sampling):
统计样本的实际观测值与理论推断值之间的偏离程度,实际观测值与理论推断值之间的偏离程度就决定卡方值的大小。
(2).适配度检验(Goodness of fit)
这里采用pearson检验方法
介绍几个参数:
method:这里采用pearson方法。
Statistic: 检验统计量。
degrees of freedom:自由度。表示可自由变动的样本观测值的数目,
pValue:统计学根据显著性检验方法所得到的P值。一般以P < 0.05 为显著, P<0.01 为非常显著, 其含义是样本间的差异由抽样误差所致的概率小于0.05 或0.01。 一般来说,假设检验主要看P值就够了。
(3).独立性检验(Indenpendence)
卡方独立性检验是用来检验两个属性间是否独立。其中一个属性做为行,另外一个做为列,通过貌似相关的关系考察其是否真实存在相关性。如检验:性别和习惯用左右手没有关系。
Spark MLlib 提 供 了 一 个 工 具 类 KernelDensity 用于核密度估算,核密度 估算的意思是根据已知的样本估计未知的 密度,属于非参数检验方法之一。
原理:
核密度估计的原理是。观察某一事物的 已知分布,如果某一个数在观察中出现了 ,可认为这个数的概率密度很大,和这个数比较近的数的概率密度也会比较大,而那些离这个数远的数的概率密度会比较小。
对于k-means的介绍可以看往期博文:
https://blog.csdn.net/hehe_soft_engineer/article/details/101349943#t2
下面只对K-means算法的应用作简单介绍(给出测试数据、代码以及测试结果)
任务要求:使用K-Means算法分析给定数据的受到网络攻击类别(标签)
注意:这里只是提供一种思路,并不能保证正确。
最后一列是训练数据的标签属性
代码设计过程:
1).观察数据可以看出来,首先要把第一、二、三列的数据变为整型类型(要通过广播变量的方式,在数据RDD的map操作内对变量进行修改,用以规避分块对数据一致性带来的影响)
2). 第一、二、三列进行对应映射,并将数据修改为可输入算法的RDD[array],其实此处的映射值应该差距大一些,不然分类不会太明显(这里的取值暂定为1)
3).将数据集转化为RDD[Vector]类型,作为K-means算法的输入
4). 通过train方法来传入参数用来调整模型
5) 输出聚类中心
6) 对源数据集进行分类预测并使用误差平方之和来评估数据模型
- package MLlibtest.chapter01.algorithms
-
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.mllib.clustering.KMeans
- import org.apache.spark.mllib.linalg.Vectors
- import scala.collection.mutable
- object KDDCup_kmeans {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("KDDCup_kmeans").setMaster("local[*]")
- val sc = new SparkContext(conf)
- // 将带标签的10%训练数据载入
- val traindata = sc.textFile("F:\\2019秋季学期\\Spark内存计算\\spark课程资源\\5.sparkMLlib01\\sparkmllib_data\\kddcup\\kddcup.data_10_percent_corrected")
- //观察数据可以看出来,首先要把第一、二、三列的数据变为因子类型
- val linearrs=traindata.map(line=>line.split(","))
- //设置广播变量
- val empmap1=mutable.Map.empty[String,Int]
- val empmap2=mutable.Map.empty[String,Int]
- val empmap3=mutable.Map.empty[String,Int]
- val empmap4=mutable.Map.empty[String,Int]
- val col1map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap1)
- val col2map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap2)
- val col3map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap3)
- val col4map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap4)
- //计数可以通过累加器来计算,累加器在执行任务的节点上是不能读取的
- val map1count=mutable.Map("1"->0)
- val map2count=mutable.Map("2"->0)
- val map3count=mutable.Map("3"->0)
- val map4count=mutable.Map("4"->0)
- val col1count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map1count)
- val col2count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map2count)
- val col3count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map3count)
- val col4count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map4count)
- //第一、二、三、最后一列进行对应映射,并将数据修改为可输入算法的RDD[array]
- //其实此处的映射值应该差距大一些,不然分类不会太明显(这里的取值暂定为1)
- val colmap=linearrs.map{arr=>
- if(!col1map.value.contains(arr(1))){
- val count01=col1count.value.get("1").get+1
- col1count.value+=("1"->count01)//更新集合操作
- col1map.value+=(arr(1)->count01)
- arr(1)=count01.toString
- }else{
- arr(1)=col1count.value.get("1").get.toString
- }
- if(!col2map.value.contains(arr(2))){
- val count02=col2count.value.get("2").get+1
- col2count.value+=("2"->count02)//更新集合操作
- col2map.value+=(arr(2)->count02)
- arr(2)=count02.toString
- }else{
- arr(2)=col2count.value.get("2").get.toString
- }
- if(!col3map.value.contains(arr(3))){
- val count03=col3count.value.get("3").get+1
- col3count.value+=("3"->count03)//更新集合操作
- col3map.value+=(arr(3)->count03)
- arr(3)=count03.toString
- }else{
- arr(3)=col3count.value.get("3").get.toString
- }
- if(!col4map.value.contains(arr.last)){
- val count04=col4count.value.get("4").get+1
- col4count.value+=("4"->count04)//更新集合操作
- col4map.value+=(arr.last->count04)
- arr(arr.length-1)=count04.toString
- }else{
- arr(arr.length-1)=col4count.value.get("4").get.toString
- }
- arr.dropRight(1)
- }.cache()
-
-
- //转化为RDD[Vector]类型,作为K-means算法的输入
- val vecarrs=colmap.map {arr =>
- val doublearr=arr.map(_.toDouble)
- Vectors.dense(doublearr)
- }
- val numClusters1= 23
- println(numClusters1)
- val numIterations1 = 20
- //通过train方法来传入参数用来调整模型
- val cluster1=KMeans.train(vecarrs, numClusters1, numIterations1)
-
-
- //输出映射的集合
- println("第一列:***********************************************")
- col1map.value.keys.foreach { i =>
- print( "Key = " + i )
- println(" Value = " + col1map.value(i) )
- }
- println("第二列:***********************************************")
- col2map.value.keys.foreach { i =>
- print( "Key = " + i )
- println(" Value = " + col2map.value(i) )
- }
- println("第三列:***********************************************")
- col3map.value.keys.foreach { i =>
- print( "Key = " + i )
- println(" Value = " + col3map.value(i) )
- }
- println("第四列:***********************************************")
- col4map.value.keys.foreach { i =>
- print( "Key = " + i )
- println(" Value = " + col4map.value(i) )
- }
-
-
- //输出聚类中心
- val kmeansresult=cluster1.clusterCenters.foreach(
- center=>{
- println("聚类中心点为:"+center)
- }
- )
-
-
- //对源数据集进行分类预测看分成几类
- val predic:mutable.Set[Int]=mutable.Set[Int]()
- val total=100000 //总次数
- val selectcol=vecarrs.take(total)
- for(data<-selectcol){
- val presult=cluster1.predict(data)
- predic.add(presult)
- }
- println("验证数据分类为:")
- println(predic)
- println("Spark MLlib K-means clustering test finished.")
- //cluster1.save(sc, "F:\\2019秋季学期\\Spark内存计算\\spark课程资源\\5.sparkMLlib01\\KMeansModel1")
-
-
- // Evaluate clustering by computing Within Set Sum of Squared Errors
- // 使用误差平方之和来评估数据模型
- val WSSSE = cluster1.computeCost(vecarrs)
- println(s"Within Set Sum of Squared Errors = $WSSSE")
-
-
- // Save and load model
- /*clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
- val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")*/
- // $example off$
- sc.stop()
- }
- }
1)数据处理
数据处理:
2)聚类中心
3)对数据集进行聚类验证:
4)对数据集求方差:
(感觉聚类效果不好,应该是无效的属性太多,没有进行主成分分析或者给某些属性的权值不太合适,导致偏差太大)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。