赞
踩
相关文章
spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)
spark源码分析之L-BFGS
spark mllib源码分析之OWLQN
spark中的online均值/方差统计
spark源码分析之二分类逻辑回归evaluation
spark正则化
spark在ml包中将逻辑回归封装了下,同时在算法中引入了L1和L2正则化,通过elasticNetParam来调节两种正则化的系数,同时根据选择的正则化,决定使用L-BFGS还是OWLQN优化,是谓Elastic Net。
我们首先介绍下模型训练和预测,评价中使用到的一些类。
主要用在样本的训练过程中,统计数据中各种label出现的次数及其weight,这里引入了样本weight,可以用在unbalance的数据中,通过惩罚数量大的class达到样本均衡,默认为1
class MultiClassSummarizer extends Serializable {
private val distinctMap = new mutable.HashMap[Int, (Long, Double)]
private var totalInvalidCnt: Long = 0L
distinctMap的key是label,类型为Long,value是个tuple,第一个元素是label出现的次数,第二维是weight的和,
/**
* Add a new label into this MultilabelSummarizer, and update the distinct map.
*
* @param label The label for this data point.
* @param weight The weight of this instances.
* @return This MultilabelSummarizer
*/
def add(label: Double, weight: Double = 1.0): this.type = {
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
//这里要求label必须为整数,否则认为非法
if (label - label.toInt != 0.0 || label < 0) {
totalInvalidCnt += 1
this
}else {
val (counts: Long, weightSum: Double) = distinctMap.getOrElse(label.toInt, (0L, 0.0))
//累加样本次数及weight
distinctMap.put(label.toInt, (counts + 1L, weightSum + weight))
this
}
}
/**
* Merge another MultilabelSummarizer, and update the distinct map.
* (Note that it will merge the smaller distinct map into the larger one using in-place
* merging, so either `this` or `other` object will be modified and returned.)
*
* @param other The other MultilabelSummarizer to be merged.
* @return Merged MultilabelSummarizer object.
*/
def merge(other: MultiClassSummarizer): MultiClassSummarizer = {
//将size小的并入大的,性能
val (largeMap, smallMap) = if (this.distinctMap.size > other.distinctMap.size) {
(this, other)
} else {
(other, this)
}
smallMap.distinctMap.foreach {
case (key, value) =>
val (counts: Long, weightSum: Double) = largeMap.distinctMap.getOrElse(key, (0L, 0.0))
//直接累加
largeMap.distinctMap.put(key, (counts + value._1, weightSum + value._2))
}
largeMap.totalInvalidCnt += smallMap.totalInvalidCnt
largeMap
}
返回统计到的class数,默认从0开始,所以是最大label+1
def numClasses: Int = if (distinctMap.isEmpty) 0 else distinctMap.keySet.max + 1
返回weight累积和
def histogram: Array[Double] = {
val result = Array.ofDim[Double](numClasses)
var i = 0
//应该是val len = numClasses
val len = result.length
//这里要求class从0到k-1
while (i < len) {
result(i) = distinctMap.getOrElse(i, (0L, 0.0))._2
i += 1
}
result
}
对比numClasses,可以看到这里result实现是有点问题的,必须要求class从0到k-1全部出现了,否则会丢失部分的class的统计。
在spark中的online均值/方差统计中已有介绍,计算样本集的方差,用于归一化。
逻辑回归model,放着训练得到的系数矩阵,矩阵,class数,是否多分类等参数。
override protected def predict(features: Vector): Double = if (isMultinomial) {
super.predict(features)
} else {
// Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden.
if (score(features) > getThreshold) 1 else 0
}
可以看到二分类与多分类是分开处理的,其原理是不同的
从上面可以看到二分类的预测是通过计算特征得分,与threshold比较,大于为1,否则0,score函数代码
private val score: Vector => Double = (features) => {
val m = margin(features)
1.0 / (1.0 + math.exp(-m))
}
从score函数可以看到,这里是将margin带入了sigmoid函数,我们看margin函数
private val margin: Vector => Double = (features) => {
BLAS.dot(features, _coefficients) + _intercept
}
就是将特征与系数相乘,再加上截距。
二分类中还实现了一些低级API,用在evaluate model,分别计算margin,预测值,预测label
//计算二分类的margin,返回DenseVector
override protected def predictRaw(features: Vector): Vector = {
val m = margin(features)
Vectors.dense(-m, m)
}
//由margin计算原始的预测值,也就是经过sigmoid函数的值
override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = {
rawPrediction match {
case dv: DenseVector =>
var i = 0
val size = dv.size
while (i < size) {
dv.values(i) = 1.0 / (1.0 + math.exp(-dv.values(i)))
i += 1
}
dv
case sv: SparseVector =>
throw new RuntimeException("Unexpected error in LogisticRegressionModel:" +
" raw2probabilitiesInPlace encountered SparseVector")
}
}
//由原始的预测值,预测label,从上面可知vector(1)为实际的预测值,用来预测label
override protected def raw2prediction(rawPrediction: Vector): Double = {
// Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden.
val t = getThreshold
val rawThreshold = if (t == 0.0) {
Double.NegativeInfinity
} else if (t == 1.0) {
Double.PositiveInfinity
} else {
math.log(t / (1.0 - t))
}
if (rawPrediction(1) > rawThreshold) 1 else 0
}
多分类时,其调用了父类的predict函数
override protected def predict(features: FeaturesType): Double = {
raw2prediction(predictRaw(features))
}
调用了raw2prediction函数
override protected def raw2prediction(rawPrediction: Vector): Double = {
if (!isDefined(thresholds)) {
rawPrediction.argmax
} else {
probability2prediction(raw2probability(rawPrediction))
}
}
可以看到,如果没有设置thresholds数组(一般不会设置),直接返回了入参rawPrediction向量中最大元素所在的位置(index),举例来说rawPrediction如果是[2.3, 1.2, 5.1, 3.4],则返回2(最大值5.1)。rawPrediction来自于predictRaw函数
override protected def predictRaw(features: Vector): Vector = {
if (isMultinomial) {
margins(features)
} else {
val m = margin(features)
Vectors.dense(-m, m)
}
}
直接调用了margins函数
private val margins: Vector => Vector = (features) => {
val m = interceptVector.toDense.copy
//m = alpha * coefficientMatrix * features + beta * m
BLAS.gemv(1.0, coefficientMatrix, features, 1.0, m)
m
}
代码比较简单,系数矩阵分别于特征向量相乘,再与截距向量相加。
使用LogisticRegressionModelWriter将训练的参数和得到的系数矩阵写入hdfs
class LogisticRegressionModelWriter(instance: LogisticRegressionModel)
extends MLWriter with Logging {
private case class Data(
numClasses: Int,
numFeatures: Int,
interceptVector: Vector,
coefficientMatrix: Matrix,
isMultinomial: Boolean)
override protected def saveImpl(path: String): Unit = {
//训练时的参数
DefaultParamsWriter.saveMetadata(instance, path, sc)
//保存训练结果
val data = Data(instance.numClasses, instance.numFeatures, instance.interceptVector,
instance.coefficientMatrix, instance.isMultinomial)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
metadata中除了训练参数,还保存了训练时的环境,官方demo的训练参数保存结果
{
"class": "org.apache.spark.ml.classification.LogisticRegressionModel",
"timestamp": 1500886361787,
"sparkVersion": "2.0.2",
"uid": "logreg_ea57ce7dcde4",
"paramMap": {
"fitIntercept": true,
"rawPredictionCol": "rawPrediction",
"predictionCol": "prediction",
"tol": 0.000001,
"labelCol": "label",
"standardization": true,
"regParam": 0.3,
"probabilityCol": "probability",
"featuresCol": "features",
"maxIter": 10,
"elasticNetParam": 0.8,
"threshold": 0.5
}
}
使用LogisticRegressionModelReader将save保存的模型读取回来,metadata使用json解析回来,解析parquet获取系数矩阵,截距等,比较简单。
LogisticAggregator用于训练过程中,计算每轮迭代的梯度和loss,需要分布式计算,类似于上面的summarizer,也是用在treeAggregator中。
用于训练过程中计算梯度与loss,在前面介绍L-BFGS时说过其训练结果返回的系数向量只有k-1维,预测时则默认class 0的margin是0,这种是带pivot class,二分类属于这种;这里的多分类不使用这种方法,而是训练得到k个class分别对应的系数。
如前文所述,二分类是有pivot,一般二分类的梯度
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。