1. 若当前实现为GBTClassifier,检查训练集的label是否包含0和1之外的值,如果包含异常退出,否则将0和1转换成-1和+1。若当前时限为GBDTRegressor,数据不做处理。
GBTClassifier | GBDTRegressor | |
损失函数(loss) | L1 ,L2 | Logloss |
纯度计算(impurity) | 基尼系数 | label列方差 |
4.调整训练数据集的label值= -loss.gradient(pred, point.label) 【注】gradient和loss函数绑定下面章节会有讲解
6.根据训练模型预测数据:预测结果=上次迭代模型预测结果 + 当前树模型预测结果 * 当前权重(步长))
- val gbtClassfier = new GBTClassifier()
- /*设置目标列*/
- .setLabelCol("")
- /*设置特征列*/
- .setFeaturesCol("")
- /*设置损失函数类型,仅支持Logistic方式*/
- .setLossType("")
- /*设置最大深度*/
- .setMaxDepth("")
- /*设置纯度度量函数*/
- .setImpurity("")
- /*为避免driver端DAG过长,对driver栈空间压力过大以及容错压力,需要定次checkpoint清空DAG和中间数据持久化*/
- .setCheckpointInterval(10)
- /*最大迭代次数即最终计算随机森林的个数*/
- .setMaxIter("")
- .setCacheNodeIds("")
- .setMaxBins("")
- .setMaxMemoryInMB("")
- .setMinInfoGain("")
- .setMinInstancesPerNode("")
- .setSeed(31D)
- .setStepSize(0.0)
- .setSubsamplingRate(0.0)
- val model: GBTClassificationModel = gbtClassfier.fit(null:DataFrame)
- model.transform(null:DataFrame)

- private[ml] object GBTClassifierParams {
- /** 基于分类的实现仅支持:logistic计算类型 */
- final val supportedLossTypes: Array[String] = Array("logistic").map(_.toLowerCase)
- }
- import org.apache.spark.mllib.tree.loss.{AbsoluteError => OldAbsoluteError, LogLoss => OldLogLoss, Loss => OldLoss, SquaredError => OldSquaredError}
- //以上将LogLoss重命名为OldLogLoss
- ...
- override private[ml] def getOldLossType: OldLoss = {
- getLossType match {
- case "logistic" => OldLogLoss
- case _ =>
- // Should never happen because of check in setter method.
- throw new RuntimeException(s"GBTClassifier was given bad loss type: $getLossType")
- }
- }
- object LogLoss extends Loss {
- /**
- *梯度计算,用于每次迭代前生成新的label
- * Method to calculate the loss gradients for the gradient boosting calculation for binary
- * classification
- * The gradient with respect to F(x) is: - 4 y / (1 + exp(2 y F(x)))
- * @param prediction Predicted label.
- * @param label True label.
- * @return Loss gradient
- */
- @Since("1.2.0")
- override def gradient(prediction: Double, label: Double): Double = {
- - 4.0 * label / (1.0 + math.exp(2.0 * label * prediction))
- }
- /*计算预测误差*/
- override private[spark] def computeError(prediction: Double, label: Double): Double = {
- val margin = 2.0 * label * prediction
- // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable.
- 2.0 * MLUtils.log1pExp(-margin)
- }
- }

- private[ml] object GBTRegressorParams {
- // The losses below should be lowercase.
- /** Accessor for supported loss settings: squared (L2), absolute (L1) */
- final val supportedLossTypes: Array[String] = Array("squared", "absolute").map(_.toLowerCase)
- }
- import org.apache.spark.mllib.tree.loss.{AbsoluteError => OldAbsoluteError, LogLoss => OldLogLoss, Loss => OldLoss, SquaredError => OldSquaredError}
- ...
- override private[ml] def getOldLossType: OldLoss = {
- getLossType match {
- /*L2正则化*/
- case "squared" => OldSquaredError
- /*L1正则化*/
- case "absolute" => OldAbsoluteError
- case _ =>
- // Should never happen because of check in setter method.
- throw new RuntimeException(s"GBTRegressorParams was given bad loss type: $getLossType")
- }
- }
- object SquaredError extends Loss {
- /**
- * Method to calculate the gradients for the gradient boosting calculation for least
- * squares error calculation.
- * The gradient with respect to F(x) is: - 2 (y - F(x))
- * @param prediction Predicted label.
- * @param label True label.
- * @return Loss gradient
- */
- @Since("1.2.0")
- override def gradient(prediction: Double, label: Double): Double = {
- - 2.0 * (label - prediction)
- }
- override private[spark] def computeError(prediction: Double, label: Double): Double = {
- val err = label - prediction
- err * err
- }
- }

- object AbsoluteError extends Loss {
- /**
- * Method to calculate the gradients for the gradient boosting calculation for least
- * absolute error calculation.
- * The gradient with respect to F(x) is: sign(F(x) - y)
- * @param prediction Predicted label.
- * @param label True label.
- * @return Loss gradient
- */
- @Since("1.2.0")
- override def gradient(prediction: Double, label: Double): Double = {
- if (label - prediction < 0) 1.0 else -1.0
- }
- override private[spark] def computeError(prediction: Double, label: Double): Double = {
- val err = label - prediction
- math.abs(err)
- }
- }

【实现方式】 默认情况下:
- def defaultStrategy(algo: Algo): Strategy = algo match {
- //若当前为GBDT分类实现,在策略中将Gini作为纯度度量
- case Algo.Classification =>
- new Strategy(algo = Classification, impurity = Gini, maxDepth = 10,
- numClasses = 2)
- //若当前为GBDT分类实现,在策略中将Variance作为纯度度量
- case Algo.Regression =>
- new Strategy(algo = Regression, impurity = Variance, maxDepth = 10,
- numClasses = 0)
- }
- object Gini extends Impurity {
- /**
- * :: DeveloperApi ::
- * information calculation for multiclass classification
- * @param counts Array[Double] with counts for each label
- * @param totalCount sum of counts for all labels
- * @return information value, or 0 if totalCount = 0
- */
- @Since("1.1.0")
- @DeveloperApi
- override def calculate(counts: Array[Double], totalCount: Double): Double = {
- if (totalCount == 0) {
- return 0
- }
- val numClasses = counts.length
- var impurity = 1.0
- var classIndex = 0
- while (classIndex < numClasses) {
- val freq = counts(classIndex) / totalCount
- impurity -= freq * freq
- classIndex += 1
- }
- impurity
- }

- object Variance extends Impurity {
- /**
- * :: DeveloperApi ::
- * variance calculation
- * @param count number of instances
- * @param sum sum of labels
- * @param sumSquares summation of squares of the labels
- * @return information value, or 0 if count = 0
- */
- @Since("1.0.0")
- @DeveloperApi
- override def calculate(count: Double, sum: Double, sumSquares: Double): Double = {
- if (count == 0) {
- return 0
- }
- val squaredLoss = sumSquares - (sum * sum) / count
- squaredLoss / count
- }

【GBTRegression】数据准备,超参封装,以及训练模型代码 调度相关源码实现和源码注释
- override protected def train(dataset: Dataset[_]): GBTRegressionModel = {
- /*
- * 获取列的基元个数,主要通过判断每列有无做过分桶或者二分类处理
- * 例如:若做过分桶处理,分桶个数就是Map中的Value,key为field下标.若做个二分类相应value值就为2
- */
- val categoricalFeatures: Map[Int, Int] =
- MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
- /*根据配置的labelCol和featrueCol将RDD中的行数据分装成LabelPoint*/
- val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
- /*获取特征列个数*/
- val numFeatures = oldDataset.first().features.size
- /*封装默认训练策略(数据纯度,损失函数,最大深度,迭代次数等等)*/
- val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)
- /*初始化 日志和计算指标(性能耗时)收集器*/
- val instr = Instrumentation.create(this, oldDataset)
- instr.logParams(params: _*)
- instr.logNumFeatures(numFeatures)
- /*开始梯度提升训练,训练过程分类和回归的训练函数一致,并做参数,label数据微调*/
- val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
- $(seed))
- /*将训练出的回归树模型和各个模型权重以及特征个数(与测试验证用)封装成模型对象*/
- val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)
- /*输出成功日志*/
- instr.logSuccess(m)
- m
- }

【GBTClassification】数据准备,超参封装,以及训练模型代码 调度相关源码实现和源码注释
- override protected def train(dataset: Dataset[_]): GBTClassificationModel = {
- /*和回归实现方式一致,计算各列的基元数*/
- val categoricalFeatures: Map[Int, Int] =
- MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
- // We copy and modify this from Classifier.extractLabeledPoints since GBT only supports
- // 2 classes now. This lets us provide a more precise error message.
- /*检查label列是否包含[0|1]之外的值,若label出现[0|1]之外的值将终止计算,异常退出*/
- val oldDataset: RDD[LabeledPoint] =
- dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
- case Row(label: Double, features: Vector) =>
- require(label == 0 || label == 1, s"GBTClassifier was given" +
- s" dataset with invalid label $label. Labels must be in {0,1}; note that" +
- s" GBTClassifier currently only supports binary classification.")
- LabeledPoint(label, features)
- }
- /*和回归算法实现一致,获取特征列个数*/
- val numFeatures = oldDataset.first().features.size
- /*和回归算法一致,封装计算策略,包含纯度测度等封装*/
- val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification)
- /*和回归算法一致,封装日志和性能指标相关测量函数*/
- val instr = Instrumentation.create(this, oldDataset)
- instr.logParams(params: _*)
- instr.logNumFeatures(numFeatures)
- instr.logNumClasses(2)
- /*和回归实现一致,开始训练模型,此处列选择纯度测度和其他差异算法,已经在boostingStrategy中差异化封装完成*/
- val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
- $(seed))
- /*将训练得出回归树和每棵树的权重封装成GBTClassificationModel*/
- val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
- instr.logSuccess(m)
- m
- }

在正式训练之前,GBDT分类相关实现对训练数据做了一个封装,将label列的[0|1]转换成[-1|1]。在训练模型时均调用 GradientBoostedTrees.boost(后续展示)来训练模型。
- def run(
- input: RDD[LabeledPoint],
- boostingStrategy: OldBoostingStrategy,
- seed: Long): (Array[DecisionTreeRegressionModel], Array[Double]) = {
- val algo = boostingStrategy.treeStrategy.algo
- algo match {
- case OldAlgo.Regression =>
- GradientBoostedTrees.boost(input, input, boostingStrategy, validate = false, seed)
- case OldAlgo.Classification =>
- // Map labels to -1, +1 so binary classification can be treated as regression.
- /*为了分类GBDT算法能够以回归树的方式计算,将0,1转换成-1,+1*/
- val remappedInput = input.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
- GradientBoostedTrees.boost(remappedInput, remappedInput, boostingStrategy, validate = false,
- seed)
- case _ =>
- throw new IllegalArgumentException(s"$algo is not supported by gradient boosting.")
- }
- }

- /**
- * Internal method for performing regression using trees as base learners.
- * @param input training dataset
- * @param validationInput validation dataset, ignored if validate is set to false.
- * @param boostingStrategy boosting parameters
- * @param validate whether or not to use the validation dataset.
- * @param seed Random seed.
- * @return tuple of ensemble models and weights:
- * (array of decision tree models, array of model weights)
- */
- def boost(
- input: RDD[LabeledPoint],
- validationInput: RDD[LabeledPoint],
- boostingStrategy: OldBoostingStrategy,
- validate: Boolean,
- seed: Long): (Array[DecisionTreeRegressionModel], Array[Double]) = {
- val timer = new TimeTracker()
- timer.start("total")
- timer.start("init")
- boostingStrategy.assertValid()
- // Initialize gradient boosting parameters 初始化梯度提升配置的各个参数
- /*获取最大迭代次数*/
- val numIterations = boostingStrategy.numIterations
- /*申请存放训练结果(回归树)的数组容器,容量大小为迭代次数*/
- val baseLearners = new Array[DecisionTreeRegressionModel](numIterations)
- /*为训练结果模型(回归树)分配权重容器*/
- val baseLearnerWeights = new Array[Double](numIterations)
- /*获取损失函数实现,回归为(L1,L2),分类为logLoss 实现见前面【损失函数实现章节】*/
- val loss = boostingStrategy.loss
- /*获取学习率(步长默认0.1)*/
- val learningRate = boostingStrategy.learningRate
- // Prepare strategy for individual trees, which use regression with variance impurity. 提取单次迭代数的策略
- val treeStrategy = boostingStrategy.treeStrategy.copy
- val validationTol = boostingStrategy.validationTol
- treeStrategy.algo = OldAlgo.Regression
- treeStrategy.impurity = OldVariance
- treeStrategy.assertValid()
- // Cache input 由于input(RDD)会多次迭代使用,为避免重复计算前面DAG,缓存数据
- val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
- input.persist(StorageLevel.MEMORY_AND_DISK)
- true
- } else {
- false
- }
- // Prepare periodic checkpointers,中间数据持久化,清空之前DAG
- val predErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)](
- treeStrategy.getCheckpointInterval, input.sparkContext)
- val validatePredErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)](
- treeStrategy.getCheckpointInterval, input.sparkContext)
- timer.stop("init")
- logDebug("##########")
- logDebug("Building tree 0")
- logDebug("##########")
- // Initialize tree,DGDT为启发式计算,先计算第一个回归树模型,默认给予1.0权重
- timer.start("building tree 0")
- val firstTree = new DecisionTreeRegressor().setSeed(seed)
- val firstTreeModel = firstTree.train(input, treeStrategy)
- val firstTreeWeight = 1.0
- baseLearners(0) = firstTreeModel
- baseLearnerWeights(0) = firstTreeWeight
- /*预测数据,并根据不同实现方式和传入的损失函数,计算预测误差。计算方式见前面章节【损失函数实现】*/
- var predError: RDD[(Double, Double)] =
- computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
- predErrorCheckpointer.update(predError)
- /*输出预测误差均值*/
- logDebug("error of gbt = " + predError.values.mean())
- // Note: A model of type regression is used since we require raw prediction
- timer.stop("building tree 0")
- /*预测验证集label,并根据loss函数计算误差*/
- var validatePredError: RDD[(Double, Double)] =
- computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
- if (validate) validatePredErrorCheckpointer.update(validatePredError)
- /*计算误差均值*/
- var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
- /*初始化最佳模型树下标*/
- var bestM = 1
- var m = 1
- /*是否提前终止迭代*/
- var doneLearning = false
- while (m < numIterations && !doneLearning) {
- // Update data with pseudo-residuals
- /*将上次预测的结果和label 取梯度的反方向,作为当前迭代的label值,梯度算法见前面章节【损失函数】*/
- val data = predError.zip(input).map { case ((pred, _), point) =>
- LabeledPoint(-loss.gradient(pred, point.label), point.features)
- }
- timer.start(s"building tree $m")
- logDebug("###################################################")
- logDebug("Gradient boosting tree iteration " + m)
- logDebug("###################################################")
- /*初始化回归决策树并训练模型*/
- val dt = new DecisionTreeRegressor().setSeed(seed + m)
- val model = dt.train(data, treeStrategy)
- timer.stop(s"building tree $m")
- // Update partial model
- /*将训练的模型,放入模型容器*/
- baseLearners(m) = model
- // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
- // Technically, the weight should be optimized for the particular loss.
- // However, the behavior should be reasonable, though not optimal.
- /* 学习率(步长)作为当前模型权重,后续会根据学习率(步长)计算预测值
- * (预测结果=上一个树模型预测结果 + 当前树模型预测结果 * 当前权重(步长))
- */
- baseLearnerWeights(m) = learningRate
- /*根据训练出的回归树模型,做预测(预测结果=上一个树模型预测结果 + 当前树模型预测结果 * 当前权重(步长)),并根据配置的loss函数计算预测误差*/
- predError = updatePredictionError(
- input, predError, baseLearnerWeights(m), baseLearners(m), loss)
- predErrorCheckpointer.update(predError)
- logDebug("error of gbt = " + predError.values.mean())
- //为避免过拟合,是否提前终止计算,当前默认为false,且不可修改,当前算法实现,如下代码将不执行
- if (validate) {
- // Stop training early if
- // 1. Reduction in error is less than the validationTol or
- // 2. If the error increases, that is if the model is overfit.
- // We want the model returned corresponding to the best validation error.
- /*预测验证集的label,并计算预测误差值,*/
- validatePredError = updatePredictionError(
- validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
- validatePredErrorCheckpointer.update(validatePredError)
- /*计算验证集误差期望*/
- val currentValidateError = validatePredError.values.mean()
- /*默认情况:validationTol -> 1e-5 ,若最好模型误差期望和当前预测误差期望差值小于某定制,将提前终止计算*/
- if (bestValidateError - currentValidateError < validationTol * Math.max(
- currentValidateError, 0.01)) {
- doneLearning = true
- } else if (currentValidateError < bestValidateError) {
- /*若当前模型误差期望小于最好模型误差期望,当前模型下标作为最佳模型的下标(标记当前模型为最好模型)*/
- bestValidateError = currentValidateError
- bestM = m + 1
- }
- }
- m += 1
- }
- timer.stop("total")
- logInfo("Internal timing for DecisionTree:")
- logInfo(s"$timer")
- /*删除所有持久化的中间数据*/
- predErrorCheckpointer.deleteAllCheckpoints()
- validatePredErrorCheckpointer.deleteAllCheckpoints()
- if (persistedInput) input.unpersist()
- /*返回模型树数组和各个模型的权重(出了第一个为1,其余的值和步长相同)*/
- if (validate) {
- /*若开启了提前终止计算,删除结果模型容器中多余的空位*/
- (baseLearners.slice(0, bestM), baseLearnerWeights.slice(0, bestM))
- } else {
- (baseLearners, baseLearnerWeights)
- }
- }

- override protected def transformImpl(dataset: Dataset[_]): DataFrame = {
- /*广播模型变量*/
- val bcastModel = dataset.sparkSession.sparkContext.broadcast(this)
- /*实现预测相关UDF*/
- val predictUDF = udf { (features: Any) =>
- /*调用下面函数进行预测*/
- bcastModel.value.predict(features.asInstanceOf[Vector])
- }
- /*将预测结果作为新的一列拼接到当前DataFrame*/
- dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
- }
- override protected def predict(features: Vector): Double = {
- // TODO: When we add a generic Boosting class, handle transform there? SPARK-7129
- // Classifies by thresholding sum of weighted tree predictions
- /*计算每棵树的预测结果*/
- val treePredictions = _trees.map(_.rootNode.predictImpl(features).prediction)
- /*将每棵树的计算结果和相关权重做ddot计算*/
- blas.ddot(numTrees, treePredictions, 1, _treeWeights, 1)
- }

- override protected def transformImpl(dataset: Dataset[_]): DataFrame = {
- /*广播模型变量*/
- val bcastModel = dataset.sparkSession.sparkContext.broadcast(this)
- /*实现预测的UDF*/
- val predictUDF = udf { (features: Any) =>、
- /*调用下面的函数进行预测*/
- bcastModel.value.predict(features.asInstanceOf[Vector])
- }
- /*将预测结果作为新的一列拼接到当前DataFrame*/
- dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
- }
- override protected def predict(features: Vector): Double = {
- // TODO: When we add a generic Boosting class, handle transform there? SPARK-7129
- // Classifies by thresholding sum of weighted tree predictions
- /*获取每颗模型数的预测结果*/
- val treePredictions = _trees.map(_.rootNode.predictImpl(features).prediction)
- /*将每颗树模型的预测结果和树模型的权重做ddot计算,得出一个[-1,1]的值*/
- val prediction = blas.ddot(numTrees, treePredictions, 1, _treeWeights, 1)
- /*由于模型训练期间已经将预测结果范围调整到[-1,+1],将预测结果转换成[0,1]*/
- if (prediction > 0.0) 1.0 else 0.0
- }

