赞
踩
聚类算法是机器学习中的一种无监督学习算法,它在数据科学领域应用场景很广泛,比如基于用户购买行为、兴趣等来构建推荐系统。
核心思想可以理解为,在给定的数据集中(数据集中的每个元素有可被观察的n个属性),使用聚类算法将数据集划分为k个子集,并且要求每个子集内部的元素之间的差异度尽可能低,而不同子集元素的差异度尽可能高。简而言之,就是通过聚类算法处理给定的数据集,将具有相同或类似的属性(特征)的数据划分为一组,并且不同组之间的属性相差会比较大。
K-Means算法是聚类算法中应用比较广泛的一种聚类算法,比较容易理解且易于实现。
KMeans算法的基本思想是随机给定K个初始簇中心,按照最邻近原则把待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离小于某个给定的值或者满足已定条件。主要分为4个步骤:
示例图:
KMeans算法在做聚类分析的过程中主要有两个难题:初始聚类中心的选择和聚类个数K的选择。
Spark MLlib针对"标准"KMeans的问题,在实现自己的KMeans上主要做了如下核心优化:
1. 选择合适的初始中心点
Spark MLlib在初始中心点的选择上,有两种算法:
随机选择:依据给的种子seed,随机选择K个随机中心点
k-means||:默认的算法
- val RANDOM = "random"
- val K_MEANS_PARALLEL = "k-means||"
2. 计算样本属于哪一个中心点时对距离计算的优化
假设中心点是(a1,b1),要计算的点是(a2,b2),那么Spark MLlib采取的计算方法是(记为lowerBoundOfSqDist):
对比欧几里得距离(记为EuclideanDist):
可轻易证明lowerBoundOfSqDist小于或等于EuclideanDist,并且计算lowerBoundOfSqDist很方便,只需处理中心点和要计算的点的L2范数。那么在实际处理中就分两种情况:
关于fastSquaredDistance:
- 首先计算一个精度:
- val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
- if (precisionBound1 < precision) {
- // 精度满足squared distance期望的精度
- // val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
- // 2.0 * dot(v1, v2)为2(a1*a2 + b1*b2)可以利用之前计算的L2范数
- sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
- } else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
- val dotValue = dot(v1, v2)
- sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
- val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
- (sqDist + EPSILON)
- if (precisionBound2 > precision) {
- sqDist = Vectors.sqdist(v1, v2)
- }
- } else {
- sqDist = Vectors.sqdist(v1, v2)
- }
- //精度不满足要求时,则进行Vectors.sqdist(v1, v2)的处理,即原始的距离计算

基于mllib包下的KMeans相关源码涉及的类和方法(ml包下与下面略有不同,比如涉及到的fit方法):
主要看一下train和runAlgorithm的核心源码:
- def train(
- // 数据样本
- data: RDD[Vector],
- // 聚类数量
- k: Int,
- // 最大迭代次数
- maxIterations: Int,
- // 初始化中心,支持"random"或者"k-means||"
- initializationMode: String,
- // 初始化时的随机种子
- seed: Long): KMeansModel = {
- new KMeans().setK(k)
- .setMaxIterations(maxIterations)
- .setInitializationMode(initializationMode)
- .setSeed(seed)
- .run(data)
- }

- /**
- * Implementation of K-Means algorithm.
- */
- private def runAlgorithm( data: RDD[VectorWithNorm],
- instr: Option[Instrumentation]): KMeansModel = {
-
- val sc = data.sparkContext
-
- val initStartTime = System.nanoTime()
-
- val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure)
-
- val centers = initialModel match {
- case Some(kMeansCenters) =>
- kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
- case None =>
- if (initializationMode == KMeans.RANDOM) {
- // random
- initRandom(data)
- } else {
- // k-means||
- initKMeansParallel(data, distanceMeasureInstance)
- }
- }
- val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
- logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
-
- var converged = false
- var cost = 0.0
- var iteration = 0
-
- val iterationStartTime = System.nanoTime()
-
- instr.foreach(_.logNumFeatures(centers.head.vector.size))
-
- // Execute iterations of Lloyd's algorithm until converged
- // Kmeans迭代执行,计算每个样本属于哪个中心点,中心点累加的样本值以及计数。然后根据中心点的所有样本数据进行中心点的更新,并且比较更新前的数值,根据两者距离判断是否完成
- //迭代次数小于最大迭代次数,并行计算的中心点还没有收敛
- while (iteration < maxIterations && !converged) {
- // 损失值累加器
- val costAccum = sc.doubleAccumulator
- // 广播中心点
- val bcCenters = sc.broadcast(centers)
-
- // Find the new centers
- val collected = data.mapPartitions { points =>
- // 当前聚类中心
- val thisCenters = bcCenters.value
- // 中心点的维度
- val dims = thisCenters.head.vector.size
-
- val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
- val counts = Array.fill(thisCenters.length)(0L)
-
- points.foreach { point =>
- // 通过当前的聚类中心点,找出最近的聚类中心点
- // findClosest是为了计算bestDistance,参考上述Spark对距离计算的优化
- val (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point)
- costAccum.add(cost)
- distanceMeasureInstance.updateClusterSum(point, sums(bestCenter))
- counts(bestCenter) += 1
- }
-
- counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
- }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
- axpy(1.0, sum2, sum1)
- (sum1, count1 + count2)
- }.collectAsMap()
-
- if (iteration == 0) {
- instr.foreach(_.logNumExamples(collected.values.map(_._2).sum))
- }
-
- val newCenters = collected.mapValues { case (sum, count) =>
- distanceMeasureInstance.centroid(sum, count)
- }
-
- bcCenters.destroy(blocking = false)
-
- // Update the cluster centers and costs
- converged = true
- newCenters.foreach { case (j, newCenter) =>
- if (converged &&
- !distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) {
- // 距离大于,则说明中心点位置改变
- converged = false
- }
- // 更新中心点
- centers(j) = newCenter
- }
-
- cost = costAccum.value
- iteration += 1
- }
-
- val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
- logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
-
- if (iteration == maxIterations) {
- logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
- } else {
- logInfo(s"KMeans converged in $iteration iterations.")
- }
-
- logInfo(s"The cost is $cost.")
-
- new KMeansModel(centers.map(_.vector), distanceMeasure, cost, iteration)
- }

- 诺丹姆吉本斯主教中学(Notre Dame-Bishop Gibbons School) 71 0 0 283047.0 13289.0
- 海景基督高中(Ocean View Christian Academy) 45 0 0 276403.0 13289.0
- 卡弗里学院(Calvary Baptist Academy) 58 0 0 227567.0 13289.0
- ...
2. 示例代码
- //将加载的rdd数据,每一条变成一个向量,整个数据集变成矩阵
- val parsedata = rdd.map { case Row(schoolid, schoolname, locationid, school_type, zs, fee, byj) =>
- //"特征因子":学校位置id,学校类型,住宿方式,学费,备用金
- val features = Array[Double](locationid.toString.toDouble, school_type.toString.toDouble, zs.toString.toDouble, fee.toString.toDouble, byj.toString.toDouble)
- //将数组变成机器学习中的向量
- Vectors.dense(features)
- }.cache() //默认缓存到内存中,可以调用persist()指定缓存到哪
-
- //用kmeans对样本向量进行训练得到模型
- //聚类中心
- val numclusters = List(3, 6, 9)
- //指定最大迭代次数
- val numIters = List(10, 15, 20)
- var bestModel: Option[KMeansModel] = None
- var bestCluster = 0
- var bestIter = 0
- val bestRmse = Double.MaxValue
- for (c <- numclusters; i <- numIters) {
- val model = KMeans.train(parsedata, c, i)
- //集内均方差总和(WSSSE),一般可以通过增加类簇的个数 k 来减小误差,一般越小越好(有可能出现过拟合)
- val d = model.computeCost(parsedata)
- println("选择K:" + (c, i, d))
- if (d < bestRmse) {
- bestModel = Some(model)
- bestCluster = c
- bestIter = i
- }
- }
- println("best:" + (bestCluster, bestIter, bestModel.get.computeCost(parsedata)))
- //用模型对我们的数据进行预测
- val resrdd = df.map { case Row(schoolid, schoolname, locationid, school_type, zs, fee, byj) =>
- //提取到每一行的特征值
- val features = Array[Double](locationid.toString.toDouble, school_type.toString.toDouble, zs.toString.toDouble, fee.toString.toDouble, byj.toString.toDouble)
- //将特征值转换成特征向量
- val linevector = Vectors.dense(features)
- //将向量输入model中进行预测,得到预测值
- val prediction = bestModel.get.predict(linevector)
-
- //返回每一行结果((sid,sname),所属类别)
- ((schoolid.toString, schoolname.toString), prediction)
- }
-
- //中心点
- /*val centers: Array[linalg.Vector] = model.clusterCenters
- centers.foreach(println)*/
-
- //按照所属"类别"分组,并根据"类别"排序,然后保存到数据库
- // saveData2Mysql是封装好的保存数据到mysql的方法
- resrdd.groupBy(_._2).sortBy(_._1).foreachPartition(saveData2Mysql(_))

上述示例只是一个简单的demo,实际应用中会更复杂,牵涉到数据的预处理,比如对数据进行量化、归一化,以及如何调参以获取最优训练模型。
推荐文章:
Spark实现推荐系统中的相似度算法
关于一些技术点的随笔记录(二)
Spark存储Parquet数据到Hive,对map、array、struct字段类型的处理
Kafka中sequence IO、PageCache、SendFile的应用详解
对Spark硬件配置的建议mp.weixin.qq.com关注微信公众号:大数据学习与分享,获取更多技术干货
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。