当前位置:   article > 正文

大数据Spark实战第七集 机器学习和数据处理_setstages函数功能

setstages函数功能

机器学习是什么: 机器学习与机器学习工作流

在开始本课时的学习之前,我们先来讲解上个课时的思考题:成环的路径会使消息一直传递下去,所以需要在发送消息时对消息最初出发的顶点和当前顶点进行校验。

下面我们进入本课时的学习,整个模块 6 主要学习 Spark 的机器学习套件 MLlib。MLlib 在功能上与 Scikit-Learn 等机器学习库非常类似,但计算引擎采用的是 Spark,即所有计算过程均实现了分布式,这也是它和其他机器学习库最大的不同。

但在学习 MLlib 的时候,你大可不必关注其分布式细节,这是 MLlib 组件与其他组件很不一样的地方,这里不用考虑 GraphX、Structured Streaming 中的关键抽象、分布式计算框架,而只需关注那些机器学习任务本身的东西,如参数、模型、工作流、测试、算法调优等。本课时的主要内容有:

  • 机器学习

  • 典型的机器学习工作流

  • 机器学习任务的学习类型

机器学习

在本课时中,我们将试着从计算机科学、 统计学和数据分析的角度来定义机器学习 。机器学习是计算机科学的一个分支,为计算机提供了无须明确编程的学习能力(Arthur Samuel,1959)。这个研究领域是从人工智能的模式识别和计算学习理论的研究中演化而来的。

更具体地说,机器学习探讨了启发式学习和基于数据进行预测的算法研究和构建。这种算法通过样本输入构建模型,通过制订数据驱动的预测来代替严格的静态程序代码。

来看看卡耐基梅隆大学的 Tom M. Mitchell 教授对机器学习的定义,他从计算机科学的角度解释了机器学习的真正意义:

对于某类任务 T 和性能度量 P, 如果一个计算机程序在 T 上以 P 衡量的性能随着经验 E 而自我完善,那么就称这个计算机程序从经验 E 中学习

基于该定义,我们得出计算机程序或机器能够:

  • 从历史数据中学习;

  • 通过经验而获得提升;

  • 交互式地增强可用于预测问题结果的模型;

  • 典型的机器学习任务是概念学习、预测建模、聚类以及寻找有用的模式。最终目标是提高学习的自动化程度,从而不再需要人为地干预,或尽可能地降低人为干预的水平。

典型的机器学习工作流

一个典型的机器学习应用程序涉及从输入、处理到输出这几个步骤 ,从而形成一个科学的工作流程,如下图所示:

5.png

具体步骤如下:

  1. 加载样本数据。

  2. 将数据解析为算法所需的格式。

  3. 预处理数据并处理缺失值。

  4. 将数据分成两组:一组用于构建模型(训练数据集),另一组用于测试模型(测试数据集)。

  5. 运行算法来构建或训练你的 ML 模型。

  6. 用训练数据进行预测并观察结果。

  7. 使用测试数据测试和评估模型,或者使用第 3 个数据集(称为验证数据集)运用交叉验证技术验证模型。

  8. 调整模型以获得更好的性能和准确性。

  9. 调整模型扩展性,以便将来能够处理大量的数据集。

  10. 部署模型。

在步骤 4 中,实验数据集是随机分割的,通常分为一个训练数据集和一个被称为采样的测试数据集。训练数据集用于训练模型,而测试数据集用于最终评估最佳模型的性能。更好的做法是尽可能多地使用训练数据集以提高泛化性能。另一方面,建议只使用一次测试数据集,以便在计算预测误差和相关度量时避免过度拟合的问题。

机器学习任务的学习类型

根据学习系统学习反馈的本质,机器学习任务通常被分为以下 3 类,即监督学习、无监督学习以及增强学习,如下图所示:

1.png

1. 监督学习

监督学习的目标是:学习将输入映射到与现实世界相一致的输出的一般规则,你可以理解为基于过去的数据进行预测 。例如,垃圾邮件过滤数据集通常包含垃圾邮件以及非垃圾邮件,因此,能够知道训练集中的数据是垃圾邮件还是正常邮件。我们有机会利用这些信息来训练模型,以便对新来的邮件进行分类。

算法找到所需的模式后,可以使用这些模式对未标记的测试数据进行预测。这是最常见的机器学习任务类型,MLlib 也不例外,其中大部分算法都是监督学习,如朴素贝叶斯、逻辑回归、随机森林等,监督学习的数据处理流程大致如下图所示。

2.png

从图中可以看出,经过数据预处理后,数据会被分为两部分,一部分为测试集,另一部分为训练集。通过学习算法,可以由训练集得到我们所需的模型,模型会用测试集进行验证,工程师会根据验证的情况对模型进行调优,实现一个数据驱动的优化过程。

2. 无监督学习

在无监督学习中,数据没有相关的标签,也就是说无法区分训练集与测试集。因此,我们需要在算法上加上标签,如下图所示。因此,标签必须从数据集中推断出来,这意味着无监督学习算法的目标是通过描述结构,以某种结构化的方式对数据进行预处理。

3.png

为了克服无监督学习中的这个障碍,通常使用聚类技术,基于某些相似性度量来对未标记样本进行分组。因此,无监督学习任务会涉及挖掘隐藏的模式、特征学习等。聚类是智能地对数据集中的元素进行分类的过程。总体思路是,同一个类中的两个元素比属于不同类中的元素彼此更为“接近”。 “接近”的定义可以有很多种,在后面的课时会详细讨论。

无监督的例子包括聚类、频繁模式挖掘以及降维等。MLlib 也提供了 k 均值聚类、潜在狄利克雷分布(Latent Dirichlet Allocation)、主成分分析(Principal Component Analysis)、奇异值分解(Singular value decomposition)等聚类与降维算法。

3. 增强学习

和我们从过去的经验中学习一样,多年来积极的赞美和负面的批评都有助于塑造出今天的我们。通过与朋友、家人,甚至陌生人互动,我们可以了解什么让人开心,什么让人难过。当你执行某个操作时,你有时会立即得到奖励。例如,在附近找到购物中心时可能会产生即时的满足感,但也有些时候,奖励不会马上兑现,比如长途跋涉去寻找某个地方。这些都与增强学习密切相关。

因此, 增强学习是一种模型从一系列行为中学习的技术 。数据集或样本的复杂性对于需要 学习目标函数的增强学习非常重要 。此外,为了达到最终目标,每条数据都需要做到一点,即在保证与外部环境相互作用的同时,应确保奖励函数的最大化,如下图所示:

4.png

从图中也可以看出, 增强学习与监督学习最大的不同在于其训练集中包含一个尝试的过程 ,会试图从环境中获得评价或者反馈。如围棋这种博弈类游戏,会有两个代理互相用已有的模型制订策略,并根据最后的结果修正自己模型的过程。谷歌公司的 AlphaGo 就是深度学习与增强学习相结合的一个很好的例子。

总而言之,增强学习在 行动—评价 的环境中获得知识,改进行动方案以适应环境,并在物联网环境、路线问题、股市交易、机器人等场景重得到了广泛应用。

总结

由于机器学习涉及很多概念,本课时的主要目的是理清概念,为后面的学习打好基础。在后面学习机器学习算法的课时中,每一个课时后都会有一个基于真实数据的实践。

最后给大家留一个思考题:

机器学习和深度学习的区别是什么?


标准化机器学习流程:ML pipeline

在开始今天的课程前,我们先来讲解一下上节课的课后思考题。深度学习与机器学习的不同之处在于:

  • 数据量大小。深度学习通常需要更多的样本才能达到更好的效果,所以通常深度学习的训练时间更长。

  • 硬件区别。深度学习算法通常涉及大量浮点运算,样本量也巨大,而 GPU 天然的海量流处理器架构非常适合并行计算,所以一般复杂的深度学习应用通常需要 GPU 的硬件架构。

  • 特征选择。一般机器学习解决问题时,都需要专家指定或者先验知识来确定特征,如信用模型,这些特征在很大程度上影响了模型的准确性。

  • 解决问题的方法。当使用传统机器学习方法解决问题时,经常采取化整为零、分别解决、再合并结果求解的策略。而深度学习是端到端的模型,输入训练数据,再直接输出最终结果,让深度神经网络自己学习如何提取关键特征,比如对一张有着多个目标的照片进行目标检测,需要识别出目标的类别,并指出图中所在位置。典型机器学习方法会将这个问题分为两步:目标检测与目标识别。首先,使用边框检测技术,扫描全图找到所有可能的对象,对这些对象使用目标识别算法,如支持向量机,识别出相关物体。深度学习方法则按照端到端的方式处理这个问题,比如通过卷积神经网络就能够实现目标的定位与识别,也就是将原始图像输入到卷积神经网络模型中,再直接输出图像中目标的位置和类别。

  • 可解释性。同神经网络算法一样,深度学习模型很难进行解释,这也使深度学习算法无法应用于很多要求模型可解释的场景,如金融等。

接下来我们开始讲解今天的内容,标准化机器学习流程:ML Pipeline。Spark MLlib 是 Spark 机器学习套件, 它的目的是成为大数据机器学习的最佳实践。为了简化机器学习过程并使其可扩展,Spark ML API 引入了 Pipelines API(管道),这类似于 Python 机器学习库 Scikit-Learn 中的 Pipeline,它采用了一系列 API 定义并标准化了上一课时中我们学习的机器学习工作流,包含了数据收集、预处理、特征抽取、特征选择、模型拟合、模型验证、模型评估等一系列阶段。例如,对文档进行分类时,也许会包含分词、特征抽取、训练分类模型,以及调优等过程。大多数机器学习库不是为分布式计算而设计的,也不提供 Pipeline 的创建与调优,而这就是 Spark ML PipeLines 要做的。

Spark ML Pipelines 就是对分布式机器学习过程进行模块化地抽象,这样可以使多个算法合并成一个 Pipeline 或者使工作流变得更加容易,下面是 Pipelines API 的关键概念:

  • DataFrame:DataFrame 与 Spark SQL 中用到的 DataFrame 一样,是 Spark 的基础数据结构,贯穿了整个 Pipeline。它可以存储文本、特征向量、训练集以及测试集。除了常见的类型,DataFrame 还支持 Spark MLlib 特有的 Vector 类型。

  • Transformer:Transformer 对应了数据转换的过程,它接收一个 DataFrame,在它的作用下,会生成一个新的 DataFrame。在机器学习中,在涉及特征转换的过程中经常会用到它。Transformer也可以用于使训练完成后的模型将特征数据集(测试集)转换为带有预测结果的数据集的场景。Transformer 必须实现 transform() 方法。

  • Estimator:从上面 Transformer 的定义中可以得知, 训练完成好的模型也是一个 Transformer ,所以 Estimator 包含了一个可以让数据集拟合出一个 Transformer 的算法。 Estimator 必须实现 fit() 方法。

  • Pipeline:一个 Pipeline 可以将多个 Transformer 和 Estimator 组装成一个特定的机器学习工作流。

  • Parameter:所有的 Estimator 和 Transformer 共用一套通用的 API 来指定参数。

文档分类是一个在自然语言处理中非常常见的应用,如垃圾邮件监测、情感分析等。下面,我们将通过一个文档分类的例子来让读者对 Spark 的 Pipeline 有一个感性的理解。简单来说,任何文档分类应用都需要以下 4 步:

  1. 将文档分词。

  2. 将分词的结果转换为词向量。

  3. 学习模型。

  4. 预测(是否为垃圾邮件或者正负情感)。

比如在垃圾邮件监测中,我们需要通过邮件正文甄别出哪些是垃圾邮件。垃圾邮件的正文一般会是一段文字,如:“代开各种发票,手续费极低,请联系我。”但这样一段文字是无法直接应用于 Estimator 的,需要将其转换为特征向量。一般做法是用一个词典构建一个向量空间,其中每一个维度都是一个词,出现过的为 1 ,未出现的为 0 ,再根据文档中出现的词语的频数,用 TF-IDF 算法为词维度赋予权重。这样的话,每个文档就能被转换为一个等长的特征向量,如下:

(0, 0, …, 0.27, 0, 0, …, 0.1, 0) ,接着就可以用它来拟合模型并输出测试结果。

我们用一个流程图来表示整个过程,如下图所示: Tokenizer 和 HashingTF 为 Transformer,作用分别是分词和计算权重,训练出的模型也是 Transformer,用来生成测试结果;Estimator 采用的是逻辑回归算法(LR);DS0-DS3 都是不同阶段输出的数据,这就是一个完整意义上的 Pipeline。

1.png

下面用代码实现整个 Pipeline,如下:

import org.apache.spark.ml.{PipelinePipelineModel} 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.{HashingTFTokenizer} 
import org.apache.spark.ml.linalg.Vector 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.SparkSession 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

object PipelineExample{

  def main(args: Array[String ]): Unit = {

    val spark = SparkSession
    .builder
    .master(“local[2]”)
    .appName(“PipelineExample”)
    .getOrCreate()
    import spark.implicits._

    // 准备训练数据,其中最后一列,就是该文档的标签,即是否为垃圾邮件
    val training = spark.createDataFrame(Seq(
      (0L, “a b c d e spark”1.0),
      (1L, “b d”0.0),
      (2L, “spark f g h”1.0),
      (3L, “hadoop mapreduce”0.0)
    )).toDF(“id”“text”“label”)

    // 配置整个Pipeline, 由3个组件组成:tokenizer(Transformer)、hashingTF(Transformer)
    // 和 lr(Estimator)
    val tokenizer = new Tokenizer()
    .setInputCol(“text”)
    .setOutputCol(“words”)

    val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol(“features”)

    val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.001)

    val pipeline = new Pipeline()
    .setStages(Array(tokenizer, hashingTF, lr))

    // 拟合模型,得到结果
    val model = pipeline.fit(training)

    // 将模型持久化
    model.write.overwrite().save(“/tmp/spark-logistic-regression-model”)

    // 将Pipeline持久化
    pipeline.write.overwrite().save(“/tmp/unfit-lr-model”)

    // 加载模型
    val sameModel = PipelineModel.load(“/tmp/spark-logistic-regression-model”)

    // 准备无标签的测试集
    val test = spark.createDataFrame(Seq(
      (4L, “spark i j k”),
      (5L, “l m n”),
      (6L, “spark hadoop spark”),
      (7L, “apache hadoop”)
    )).toDF(“id”“text”)

    // 用模型预测测试集,得到预测结果(标签)
    sameModel.transform(test)
    .select(“id”“text”“probability”“prediction”)
    .collect()
    .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
      println(s"(KaTeX parse error: Expected 'EOF', got '&' at position 11: id</span>,&̲nbsp;<span clas…text) --> prob=KaTeX parse error: Expected 'EOF', got '&' at position 13: prob</span>,&̲nbsp;prediction…prediction")
    }
  }
}

这样就用 Spark 完整实现了一个机器学习的流程。从上面的代码中可以看出,这样的结构非常有利于复用 Transformer 与 Estimator 组件。

Spark MLlib(ML API)的算法包主要分为以下几个部分:

  • 特征抽取、转换与选择;

  • 分类和回归;

  • 聚类;

  • 协同过滤;

  • 频繁项集挖掘。

其中每一类都有若干种算法的实现,用户可以利用 Pipeline 按需进行切换,下面我们将根据这几个类别,分别实现一些真实数据的案例,让读者可以直接上手应用。

此外,在上面的代码中,我们用 Pipeline API 将模型序列化成文件,这样的好处在于可以将模型看成一个黑盒,非常方便模型上线,而不用在上线应用时再去对模型进行硬编码,这类似于 Python 的 Pickle 库的用法。

小结

在 Spark 的早期版本中,并没有 ML pipeline API,这导致代码难以维护、可读性较差、也一定程度上影响了 MLlib 的流行。而 Pipeline API 的引入抽象了机器学习流程,让整个代码变得简洁优美,另外这种抽象也利于与第三方库进行结合,如 Tensorflow、XgBoost 等等。

如何理解本课时内容中的这句话:最后给大家留一个思考题,

训练完成好的模型也是一个 Transformer。


如何对数据进行预处理?

在开始今天的课程前,我们先来看看上一讲的思考题,如何理解训练好的模型也是一个 Transformer 呢?这说明 Transformer 只是描述了一种映射关系,而拟合数据,也就是训练模型的过程,就是为了得到这种映射关系。

今天的内容是 如何对数据进行预处理。 在机器学习实践中,数据科学家拿到的数据通常是不尽人意的,例如出现存在大量的缺失值、特征的值是不同的量纲、有一些无关的特征、特征的值需要再次处理等情况,这样的数据无法直接训练,因此我们需要对这些数据进行预处理。预处理在机器学习中是非常重要的步骤,如果没有按照正确的方法对数据进行预处理,往往会得到错误的训练结果。下面我先介绍几种常见的预处理方法。

数据标准化

通常,我们直接获得的数据包含了量纲,也就是单位,例如身高 180 cm、体重 75 kg。对于某些算法来说,如果特征的单位不统一,就无法直接进行计算,因此在很多情况下的预处理过程中,数据标准化是必不可少的过程。数据标准化的方法一般有 Z 分数法、最大最小法等。

Z 分数法

这种方法根据原始数据(特征)的均值(Mean)和标准差(Standard Deviation)进行数据标准化,从而将原始数据变换为 Z 分数,转化函数如下:

Drawing 2.png

其中 μ 为所有样本数据的均值, σ 为所有样本数据的标准差。Spark MLlib 内置了 Z 分数标准化转换器 StandardScaler,下面的代码演示了通过 StandardScaler 对 Libsvm 数据集的特征进行标准化的过程:

import org.apache.spark.ml.feature.StandardScaler 
  • 1

val dataFrame = spark.read.format(“libsvm”).load(“data/mllib/sample_libsvm_data.txt”)

val scaler = new StandardScaler()
.setInputCol(“features”)
.setOutputCol(“scaledFeatures”)
.setWithStd(true)
.setWithMean(false)

// 计算汇总统计量,生成ScalerModel
val scalerModel = scaler.fit(dataFrame)

// 对特征进行标准化
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

最大最小法

这种方法也称为离差标准化,是对原始数据的线性变换,使结果值映射到 [0 - 1] 之间。转换函数如下:

Drawing 3.png

其中 max 为样本数据的最大值,min 为样本数据的最小值。这种方法的缺陷是当有新数据加入时,可能导致 max 和 min 出现变化,需要重新定义,但这种情况在训练过程中很少见。对于方差特别小的特征,这种方法可以增强其稳定性。Spark MLlib 内置了最大最小转换器 MinMaxScaler。下面的代码演示了通过 MinMaxScaler 对测试数据集的特征进行标准化的过程:

import org.apache.spark.ml.feature.MinMaxScaler 
import org.apache.spark.ml.linalg.Vectors 
  • 1
  • 2

val dataFrame = spark.createDataFrame(Seq(
  (0Vectors.dense(1.00.1-1.0)),
  (1Vectors.dense(2.01.11.0)),
  (2Vectors.dense(3.010.13.0))
)).toDF(“id”“features”)

val scaler = new MinMaxScaler()
.setInputCol(“features”)
.setOutputCol(“scaledFeatures”)

// 计算汇总统计量,生成MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [KaTeX parse error: Expected 'EOF', got '&' at position 24: …getMin}</span>,&̲nbsp;<span clas…{scaler.getMax}]")
scaledData.select(“features”“scaledFeatures”).show()

p 范数法

p 范数法指的是计算样本的 p 范数,用该样本除以该样本的 p 范数,得到的值就是标准化的结果。 p 范数的计算公式如下:

Drawing 4.png

p =1 时, p 范数也叫 L1 范数,此时 L1 等于样本的所有特征值的绝对值相加。当 p =2 时, p 范数也叫 L2 范数,此时 L2 等于样本 x 距离向量空间的原点的欧氏距离:

Drawing 5.png

归一化的结果为:

Drawing 6.png

Spark MLlib 内置了 p 范数标准化转化器 Normalizer,下面的代码演示了通过 Normalizer 对测试数据集的特征进行标准化的过程:

import org.apache.spark.ml.feature.Normalizer 
import org.apache.spark.ml.linalg.Vectors 
  • 1
  • 2

val dataFrame = spark.createDataFrame(Seq(
(0Vectors.dense(1.00.5-1.0)),
(1Vectors.dense(2.01.01.0)),
(2Vectors.dense(4.010.02.0))
)).toDF(“id”“features”)

// 设置p = 1
val normalizer = new Normalizer()
.setInputCol(“features”)
.setOutputCol(“normFeatures”)
.setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// 设置p = -∞
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println(“Normalized using L^inf norm”)
lInfNormData.show()

缺失值处理

缺失值的处理需要根据数据的具体情况来定:如果特征值是连续型,通常用中位数来填充;如果特征值是标签型,通常用众数来补齐;某些情况下,还可以用一个显著区别于已有样本中该特征的值来补齐。Spark 并没有提供预置的缺失值处理的 Transformer,这通常需要自己实现,在后面的课时中,我们会实现一个对缺失值处理的自定义 Transformer。

特征抽取

特征抽取(Feature Extraction)指的是按照某种映射关系生成原有特征的一个特征子集,而前面提到的特征选择(Feature Selection)指的是根据某种规则对原有特征筛选出一个特征子集。特征选择和特征抽取有相同之处,它们都试图减少数据集中的特征数目,但具体方法不同,特征抽取主要是通过特征间的关系来操作,如组合不同特征得到新的特征,这样就改变了原来的特征空间;而特征选择是从原始特征数据集中选择出子集,两者是一种包含关系,没有更改原始的特征空间,如下图所示。

Drawing 7.png

Spark 也提供了很多种特征抽取的方法,常见的如主成分分析、广泛应用于文本的 Word2Vector 等。

主成分分析

如果在样本中无关的特征太多,就会影响模式的发现,我们需要用降维技术从样本中生成用来代表原有特征的一个特征子集。

在了解主成分分析(PCA)之前,需要先了解协方差的概念, X 特征与 Y 特征之间的协方差为:

Drawing 8.png

如果协方差为正,说明 XY 是正相关关系;如果协方差为负,则说明是负相关关系;当协方差为 0 时, XY 相互独立。如果样本集 Dn 维特征,那么两两之间的协方差就可以组成一个 n × n 的矩阵,如下是 n =3 的情况:

Drawing 9.png

然后需要对这个矩阵进行特征值分解,得到特征值和特征向量,再取出最大的 mm < n ) 个特征值对应的特征向量(w1、w2、…、 wm),从而组成特征向量矩阵 W,对每个样本 xi 执行如下操作,得到降维后的样本 zi,如下:

Drawing 10.png

降维后的数据集为:

Drawing 11.png

下面以数据挖掘领域著名的鸢尾花数据集(IRIS: https://pan.baidu.com/s/1CPB8NQEN5crGC3MQ_eVutA 密码: yey1)为例,来展示用 PCA 实现降维操作的过程。

鸢尾花数据集是常用的分类数据集,包含 150 个样本。样本总共分为 3 种类别、各 50 条,每个样本有 4 个维度,分别是花萼长度、花萼宽度、花瓣长度和花瓣宽度。操作过程的代码如下:

import org.apache.spark.ml.feature.{PCAVectorAssembler} 
import org.apache.spark.sql.{DatasetRowSparkSession} 
import org.apache.spark.ml.feature.StandardScaler 
import org.apache.spark.sql.types.{DoubleTypeStringTypeStructFieldStructType} 
import org.apache.spark.ml.Pipeline 
  • 1
  • 2
  • 3
  • 4
  • 5

object IRISPCA {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
    .builder()
    .master(“local[2]”)
    .appName(“IRISPCA”)
    .getOrCreate()

    // 数据结构为花萼长度、花萼宽度、花瓣长度、花瓣宽度
    val fields = Array(“id”,“Species”,“SepalLength”,“SepalWidth”,“PetalLength”,“PetalWidth”)

    val fieldsType = fields.map(
        r => if (r  “id”||r  “Species”)
               {StructField(r, StringType)}
             else
                {StructField(r, DoubleType)}
    )

    val schema = StructType(fieldsType)

    val featureCols = Array(“SepalLength”,“SepalWidth”,“PetalLength”,“PetalWidth”)

    val data=spark.read.schema(schema).csv(“data/iris”)

    val vectorAssembler = new VectorAssembler()
    .setInputCols(featureCols)
    .setOutputCol(“features”)

    val vectorData=vectorAssembler.transform(data)

    // 特征标准化
    val standardScaler = new StandardScaler()
    .setInputCol(“features”)
    .setOutputCol(“scaledFeatures”)
    .setWithMean(true)
    .setWithStd(false)
    .fit(vectorData)

    val pca = new PCA()
    .setInputCol(“scaledFeatures”)
    .setOutputCol(“pcaFeatures”)
    // 主成分个数,也就是降维后的维数
    .setK(2)

    val pipeline = new Pipeline()
    .setStages(Array(vectorAssembler,standardScaler,pca))

    val model = pipeline.fit(data)

    // 对特征进行PCA降维
    model.transform(data).select(“Species”“pcaFeatures”).show(100false)

  }

}

通过操作,可以发现降维后的数据只有两个维度,如下,这样就实现了我们的降维过程:

Drawing 12.png

Word2Vector

在自然语言处理领域,训练集通常为纯文本,这样的数据是无法直接训练的,前面提到的 TF-IDF 就是一种生成词向量的方式。但是 TF-IDF 的缺点在于单纯以“词频”衡量一个词的重要性,不够全面,忽略了上下文信息,例如“阿里巴巴成立达摩院”与“人工智能应用有望加速落地”字面无任何相似之处,但它们之间有很强的关联,这用 TF-IDF 却无法体现。

而 Word2Vec 可以解决这个问题。 Word2Vec 最先出现在谷歌公司在 2013 年发表的论文“Efficient Estimation of Word Representation in Vector Space”中,作者是 Mikolov。Word2Vec 的基本思想是采用一个 3 层的神经网络将每个词映射成 n 维的实数向量,为接下来的聚类或者比较相似性等操作做准备。这个 3 层神经网络实际是在对语言模型进行建模,在建模的同时也获得了单词在向量空间上的表示,即词向量,也就是说这个词向量是建模过程的中间产物,而这个中间产物才是 Word2Vec 的真正目标。

Word2Vec 采用了:CBOW 与 Skip-gram,前者可以根据上下文预测下一个词,后者可以根据当前词预测上下文。以 CBOW 为例,如下图所示。

Drawing 13.png

我们选择一个固定的窗口作为语境(上下文):t - 2 — t + 2,输入层是 4 个 n 维的词向量(初始为随机值);隐藏层做的操作是累计求和操作,隐藏层包含 n 个结点;输出层是一棵巨大的二叉树,构建这棵二叉树的算法就是霍夫曼树,它的叶子结点代表了语料中的 M 个词语,语料中有多少个词,就有多少个叶子结点。

假设左子树为 1,右子树为 0,这样每个叶子结点都有唯一的编码。最后输出的时候,CBOW 采用了层次 Softmax 算法,隐藏层的每个结点都与树的每个结点相连,即霍夫曼树上的每个结点都会有 M 条边,每条边都有权重,我们需要达到的是在输入的上下文一定的情况下,预测词 W 的概率最大。以 010110 为例,由霍夫曼树的定义可知树有 5 层,我们希望在根结点,词向量与根结点相连,也就是在第一层,希望经过回归运算后第一位等于 0 的概率尽量等于 1 。依次类推,在第二层,希望第二位的值等于 1 的概率尽可能等于 1。这样一直下去,路径上所有的权重乘积就是预测词在当前上下文的概率 P ( wt ),而在语料中我们可以得到当前上下文的残差 1- P ( wt ),这样的话,就可以使用梯度下降来学习参数了。

由于不需要标注,Word2Vec 本质上是一种无监督学习,对自然语言处理有兴趣的读者不妨仔细阅读谷歌公司的那篇论文。

Spark MLlib 内置了 Word2Vec 转换器 Word2Vec,在下面的例子中对文档使用了 Word2Vec 转换器:

import org.apache.spark.ml.feature.Word2Vec 
import org.apache.spark.ml.linalg.Vector 
import org.apache.spark.sql.Row 
  • 1
  • 2
  • 3

// 每一行输入数据都是来源于某句话或是某个文档
val documentDF = spark.createDataFrame(Seq(
“Hi I heard about Spark”.split(" “),
“I wish Java could use case classes”.split(” “),
“Logistic regression models are neat”.split(” ")
).map(Tuple1.apply)).toDF(“text”)

// 设置word2Vec参数
val word2Vec = new Word2Vec()
.setInputCol(“text”)
.setOutputCol(“result”)
.setVectorSize(3)
.setMinCount(0)

val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [KaTeX parse error: Expected '}', got '&' at position 18: …ext.mkString(",&̲nbsp;")}</span>…features\n") }

特征选择

特征选择的目标通常是提高预测准确性、提升训练性能,以便能够更好地解释模型。特征选择的一种很重要的思想就是对每一维的特征打分,这样就能选出最重要的特征了。基于这种思想的方法有:卡方检验、信息增益以及相关系数。相关系数主要量化的是任意两个特征是否存在线性相关,信息增益会在下一节介绍,本小节主要介绍卡方检验。

卡方检验是以卡方分布为基础的一种常用假设检验方法,它的原假设是观察频数与期望频数没有差别。该检验的基本思想是:首先假设原假设成立,基于此前提计算出 x2 值,它表示观察值与理论值之间的偏离程度。根据卡方分布及自由度可以确定在原假设成立的条件下,获得当前统计量及更极端情况的概率 P 。如果 P 值很小,说明观察值与理论值偏离程度太大,应当拒绝无效假设,表示比较资料之间有显著差异;否则就不能拒绝无效假设,尚不能认为样本所代表的实际情况和理论假设有差别。

假设样本的某一个特征,它的取值为 A 和 B 两个组,而样本的类别有 0 和 1 两类。对样本进行统计,可以得到如下表所示的统计表。

组别01合计
A192443
B341044
合计533487

从上表中,我们可看出 AB 组对分类结果有很大的影响,但这不排除抽样的影响。首先假设该特征有结果是独立无关的,随机取一个样本,属于类 0 的概率为 (19 + 34)/(19 + 34 + 24 + 10)= 60.9%。接下来,我们需要根据上表得到一个理论值表,如下表所示。

组别01合计
A43 × 0.609 = 26.243 × 0.391 = 16.843
B44 × 0.609 = 26.844 × 0.391 = 17.244

如果两个变量是独立无关的,那么上表中的理论值与实际值的差别会非常小。

卡方值的计算公式为:

Drawing 15.png

其中 A 为实际值,也就是第一个表中的 4 个数据, T 为理论值,也就是上表中给出的 4 个数据,计算后得到卡方值为 10.01。得到该值后,需要在给定的置信水平下查得卡方分布临界值,如下表所示。

P
n ¢0.9950.990.9750.950.90.750.50.250.10.050.0250.010.005
10.020.10.451.322.713.945.025.637.88

显然 10.01 > 7.88,也就是说该特征与分类结果无关的概率小于 0.5%,换言之,我们应该保留这个特征。

Spark MLlib 内置了卡方检验组件 ChiSqSelector,下面的例子中对某个测试数据集采用了卡方检验:

import org.apache.spark.ml.feature.ChiSqSelector 
import org.apache.spark.ml.linalg.Vectors 
  • 1
  • 2

val data = Seq(
  (7Vectors.dense(0.00.018.01.0), 1.0),
  (8Vectors.dense(0.01.012.00.0), 0.0),
  (9Vectors.dense(1.00.015.00.1), 0.0)
)

val df = spark.createDataset(data).toDF(“id”“features”“clicked”)

// 配置卡方检验参数
val selector = new ChiSqSelector()
.setNumTopFeatures(3)
.setFeaturesCol(“features”)
.setLabelCol(“clicked”)
.setOutputCol(“selectedFeatures”)

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()

小结

本课时主要介绍了数据预处理的方法,结合 Pipeline API 可以发现,编程已经完全不是预处理的重点,如何根据手上的数据集和目标选择预处理方法才是最重要的。老实说,MLlib 并没有提供非常多的预处理组件,也就是前面课时学习的 Transformer,好在它提供了自定义Transformer 的接口。

最后给大家留一个思考题:实现一个任意逻辑的自定义 Transformer。


少数服从多数:随机森林分类器

从 Pipeline 的角度来说,上一个课时我们主要学习的是 Transformer,从本课时开始将进入 Estimator 的学习,也就是机器学习算法的学习。从本课时开始,我们将从 MLlib 实现的算法中按照算法类型:分类(有监督)、聚类(无监督)、推荐算法,各选取一个算法进行学习,并且每个课时后还会有基于真实数据的实践训练,这是本模块与其他模块不同的地方。

上节课的思考题可以看成是本课时的预习,在下面的内容中,我们会通过案例解答这个问题。

分类器是机器学习最常见的应用,MLlib 中也内置了许多分类模型,而其中支持分布式计算最好的也是最常用的方法就是随机森林算法,它的基础是决策树算法。本课时,我将介绍决策树算法和随机森林算法,以及用 Spark MLlib 的随机森林分类器实现根据身体监控数据判断人体状态。

决策树

决策树是一种机器学习的方法,它通过一种树形结构对样本进行分类,每个非叶子结点代表一次判断,每个叶子结点代表的是分类结果。它是一种典型的监督学习,需要一定量的样本,常见的决策树构造树算法有 C4.5 与 ID3。

下面先来看一个例子。下表中的内容是信贷审批常见的场景:根据信息判断客户是否会逾期。它的样本一共有 4 个特征,最后得出“是否逾期”的结果,这是一个二分类场景。

2.png

根据这些样本,我们可以构造出一棵这样的树,如下图所示。

1.png

构造一棵决策树需要从样本中学习结点分裂的时机,并判断阈值,也就是上图中的 A、B、C、D、E,这个过程被称为特征选择。通过这样的方法,我们可以达到对样本分类的效果。

下面我们先来简要介绍一下决策树构造算法的整个过程。决策树构造算法通常是先选择一个最优特征,将样本分为若干个子集,如果子集已经被正确分类,那么就构造叶子结点,将样本分到叶子结点中;如果某个子集没有被正确分类,那么就对这个子集继续进行选择特征。决策树构造是一个递归过程,递归停止条件是所有样本被基本正确分类,这样就构造出了一棵决策树。

从这个过程中可以看出,决策树通常对训练数据有良好的表现,但对新样本却未必如此,容易出现过拟合,也就是说模型能将训练数据很好地正确分类,而对测试数据和真实数据来说,分类结果却不尽如人意 。因此我们需要对已经生成的树进行剪枝,从而提升模型的泛化能力。如果特征过多,在一开始构造的时候,我们也会对特征进行选择,只留下足够有区分度的特征。决策树的生成对应模型的局部选择,而剪枝对应模型的全局选择。

从上述过程可以看出,构造决策树主要包含特征选择决策树生成决策树剪枝。下面我们将来详细介绍。

特征选择

本节主要介绍两种特征选择的方式:信息增益信息增益率。在介绍这两种方式前,先来看看熵的概念:熵是表示随机变量的不确定性的度量。

定义:假设随机变量 X 的可能取值有 x1 , x2 , …, xn ,对于每一个可能的取值 xi ,其概率 P(X = xi = pi i = 1, 2, …, n) ,则随机变量 X 的熵为:

Drawing 2.png

对于样本集合 D 来说,随机变量 X 是样本的类别,即假设样本有 k 个类别,每个类别的概率是
Drawing 3.png
,其中 | Ck | 表示类别 k 的样本个数,| D | 表示样本总数,对于样本集合 D 来说,熵(经验熵)为:

Drawing 4.png

而条件熵的概念为:设有随机变量( X , Y),其联合概率分布为:

Drawing 5.png

条件熵 HY | *X)*表示在已知随机变量 X 的条件下,随机变量 Y 的不确定性。在随机变量 X 给定的条件下,随机变量 Y 的条件熵 H(Y | X),定义为 X 给定条件下 Y 的条件概率分布的熵对 X 的数学期望:

Drawing 6.png

当熵和条件熵中的概率由数据估计得到时,所对应的熵与条件熵分别称为经验熵与经验条件熵。从经验熵与经验条件熵可以得到信息增益的定义

特征 A 对训练数据集 D 的信息增益 gD , A) 的定义,是集合 D 的经验熵 H(D) 与特征 A 给定条件下 D 的经验熵与条件熵之差:

Drawing 7.png

信息增益通常用来选择特征,经验熵 H(D) 表示的是对数据集 D 进行分类的不确定性。而经验条件熵 H(D | A) 表示在特征 A 给定的条件下,对数据集 D 进行分类的不确定性,那么信息增益就表示由于特征 A 而使得对数据集 D 的分类的不确定性的减少程度。显然,对于数据集 D 而言,信息增益依赖于特征,不同特征往往具有不同的信息增益。信息增益大的特征具有更强的分类能力。根据信息增益准则,选择特征的方法是:对训练数据集(或子集) D,计算其每个特征的信息增益,并选择信息增益最大的特征。

信息增益率是对信息增益的改进,特征 A 对训练数据集 D 的信息增益率 gR(D , A) 的定义,是其信息增益 g(D , A) 与训练数据集 D 的经验熵 HD) 之比,如下图所示:

Drawing 8.png

除了信息增益与信息增益率之外,还可以用基尼系数来完成特征选择。

决策树生成

决策树生成算法与特征选择方法相对应,选用信息增益进行特征选择的是 ID3 算法,选择信息增益比进行特征选择的是 C4.5 算法,选择基尼系数来完成特征选择的是分类回归树(CART)算法。本节将介绍 C4.5 与 ID3 算法。

ID3 算法如下:

简单来说,ID3 算法会在决策树各个结点上应用信息增益准则选择特征,递归地构建决策树。

给定训练数据集 D,特征集 S,阈值 ϵ

  1. D 中所有实例属于同一类 Ck,则 T 为单结点树,并将类 Ck 作为该结点的类标记,返回 T;

  2. S = Æ,则 T 为单结点树,并将 D 中实例数最大的类 Ck 作为该结点的类标记,返回 T

  3. 否则,计算 S 中各特征对 D 的信息增益,选择信息增益最大的特征 Sg

  4. 如果 Sg 的信息增益小于阈值 ϵ,则置 T 为单结点树,并将 D 中实例数最大的类 Ck 作为该结点的类标记,返回 T

  5. 否则,对 Sg 的每一个可能值 ai,将 D 分割为若干个非空子集 Di,将 Di 中实例数最大的类作为标记,构建子结点,由结点及其子结点构成树 T,返回 T

  6. 对第 i 个子结点,以 Di 为训练集,以 S -Sg 为特征集,递归调用第 1~5 步,得到子树 Ti,返回 Ti

C4.5 算法与 ID3 算法非常类似,只是把用到信息增益的地方换成了信息增益比。

剪枝

通常决策树在训练数据上表现很好,但是在测试数据上就不尽如人意,这就是模型过拟合。决策树剪枝主要分为预剪枝和后剪枝。预剪枝是在构造决策树的同时进行剪枝,通常作为停止条件,即设定一个熵的阈值,就算可以继续降低熵,也停止创建分支。而通常我们说的剪枝是指后剪枝,后剪枝通常有以下两种做法:

  • 应用交叉验证的思想,若局部剪枝能够使得模型在测试集上的错误率降低,则进行局部剪枝。

  • 应用正则化的思想,综合考虑不确定性和模型复杂度来定出一个新的损失,用该损失作为一个结点是否应该局部剪枝的标准。这种做法的核心是定义新的代价函数,通常会采用树的结构复杂度与模型预测误差之和作为代价衡量。

在 ID3、C4.5 中我们会应用前者做法,而在分类回归树中,我们会采取后者做法。

随机森林

在决策树的基础上了解随机森林的原理相对容易。随机森林就是通过集成学习的思想将多棵树集成的一种算法,它的基本单元是决策树,属于机器学习的一大分支——集成学习(Ensemble Learning)方法。集成学习是通过构建多个弱分类器,并按一定规则组合起来的分类系统,常常比单一分类器具有显著优越的泛化性能,常见集成学习算法有随机森林、AdaBoost、XgBoost、梯度提升树等,在风险建模、疾病预测等领域应用相当广泛。

随机森林将 N 棵决策树集成,每一棵决策树都是一个分类器,相当于每个分类器都会对结果进行投票,随机森林会综合所有的分类结果,并将票数最高的分类结果作为最终结果输出。可以想到,在随机森林中,每一棵决策树的生成是算法的关键。每棵树的生成规则如下

  1. 如果训练集大小为 N,对每棵树而言,随机且有放回地从训练集中抽取 N 个训练样本(这种采样方式称为 Bootstrap Sample 方法),作为该树的训练集。

  2. 如果每个样本的特征维度为 M,指定一个常数 m<<M,随机地从 M 个特征中选取 m 个特征子集,每次树进行分裂时,从这 m 个特征中选择最优的。

  3. 每棵树都尽最大限度地生长,并且没有剪枝过程。

随机森林中所谓“随机”的含义,就是模型在这里引入了随机性(随机抽取训练集、随机抽取特征),两个随机性的引入对随机森林的分类性能至关重要。由于它们的引入,使得随机森林不容易陷入过拟合,并且具有很好的抗噪能力。随机森林的特点有:

  • 在当前所有算法中,具有极佳的准确率,在国内外最近几年的数据挖掘大赛中,随机森林取得了令人瞩目的成绩。

  • 能够高效地运行在大数据集上,很容易可以看出,随机森林是非常容易分布式的。

  • 能够处理具有高维特征的输入样本。

  • 能够评估各个特征在分类问题上的重要性。

  • 在生成过程中,能够获取到内部生成误差的一种无偏估计。

  • 对于缺省值问题也能够获得很好的结果。

人体状态监测器

下面,我们将用真实的数据(数据集下载链接: https://pan.baidu.com/s/1rUxHKl119qGDlV6KeTtkLA提取码:ev1d)拟合出一个随机森林分类器。案例的内容是通过身体监测数据来判断人体状态,比如监测走路、骑行、跑步、看电视。数据集包括时间戳、心跳、活动标签和 3 个传感器,传感器分别佩戴在手上、胸部、踝关节处,每个传感器有 17 个检测指标 (温度、3D 加速度、陀螺仪和磁强计数据、方位数据) 。数据集共计 54 个属性,3850505 条样本,包含了 18 种人体活动。代码如下:

import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.{RandomForestClassificationModelRandomForestClassifier} 
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator 
import org.apache.spark.sql.{DatasetRowSparkSession} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.feature.{IndexToStringStringIndexerVectorAssembler} 
import org.apache.spark.sql.types._ 
import scala.collection.mutable 


object RandomForestBodyDetection { 

  def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
    .builder() 
    .appName("RandomForestBodyDetection") 
    .master("local[2]") 
    .enableHiveSupport() 
    .getOrCreate() 

    import spark.implicits._ 

    // 读取数据集 
    val dataFiles = spark.read.textFile("data/bodydetect") 

    val rawData = dataFiles.map(r=>r.toString().split(" ")).rdd.map(row => { 
      val list = mutable.ArrayBuffer[Any]()
      for (i <- row.toSeq) { 
        list.append(i) 
      } 
      Row.fromSeq(list.map(v=>if (v.toString.toUpperCase == "NAN"Double.NaN else v.toString.toDouble)) 
    }) 

    val schema = StructType(Array( 
       StructField("timestamp"DoubleType), StructField("activityId"DoubleType), StructField("hr"DoubleType),
       StructField("hand_temp"DoubleType), StructField("hand_accel1X"DoubleType), StructField("hand_accel1Y"DoubleType),
       StructField("hand_accel1Z"DoubleType), StructField("hand_accel2X"DoubleType), StructField("hand_accel2Y"DoubleType),
       StructField("hand_accel2Z"DoubleType), StructField("hand_gyroX"DoubleType), StructField("hand_gyroY"DoubleType),
       StructField("hand_gyroZ"DoubleType), StructField("hand_magnetX"DoubleType), StructField("hand_magnetY"DoubleType),
       StructField("hand_magnetZ"DoubleType), StructField("hand_orientX"DoubleType), StructField("hand_orientY"DoubleType),
       StructField("hand_orientZ"DoubleType), StructField("hand_orientD"DoubleType), StructField("chest_temp"DoubleType),
       StructField("chest_accel1X"DoubleType), StructField("chest_accel1Y"DoubleType), StructField("chest_accel1Z"DoubleType),
       StructField("chest_accel2X"DoubleType), StructField("chest_accel2Y"DoubleType), StructField("chest_accel2Z"DoubleType),
       StructField("chest_gyroX"DoubleType), StructField("chest_gyroY"DoubleType), StructField("chest_gyroZ"DoubleType),
       StructField("chest_magnetX"DoubleType), StructField("chest_magnetY"DoubleType), StructField("chest_magnetZ"DoubleType),
       StructField("chest_orientX"DoubleType), StructField("chest_orientY"DoubleType), StructField("chest_orientZ"DoubleType),
       StructField("chest_orientD"DoubleType), StructField("ankle_temp"DoubleType), StructField("ankle_accel1X"DoubleType),
       StructField("ankle_accel1Y"DoubleType), StructField("ankle_accel1Z"DoubleType), StructField("ankle_accel2X"DoubleType),
       StructField("ankle_accel2Y"DoubleType), StructField("ankle_accel2Z"DoubleType), StructField("ankle_gyroX"DoubleType),
       StructField("ankle_gyroY"DoubleType), StructField("ankle_gyroZ"DoubleType), StructField("ankle_magnetX"DoubleType),
       StructField("ankle_magnetY"DoubleType), StructField("ankle_magnetZ"DoubleType), StructField("ankle_orientX"DoubleType),
       StructField("ankle_orientY"DoubleType), StructField("ankle_orientZ"DoubleType), StructField("ankle_orientD"DoubleType))) 

    val df = spark.createDataFrame(rawData,schema) 

    // 数据集的列名,sensor_name 表示某个传感器的某个指标数据,例如,手上的传感器的温度指标为 hand_temp 
    val allColumnNames = Array( 
    "timestamp""activityId""hr") ++ Array( 
    "hand""chest""ankle").flatMap(sensor => 
    Array( 
      "temp", 
      "accel1X""accel1Y""accel1Z", 
      "accel2X""accel2Y""accel2Z", 
      "gyroX""gyroY""gyroZ", 
      "magnetX""magnetY""magnetZ", 
      "orientX""orientY""orientZ""orientD").map(name => s"${sensor}_${name}") 
    ) 

    // 数据集中不需要的列、时间戳和方位数据,分别表示手、胸部、踝关节上传感器的第一个方位指标 
    val ignoredColumns = Array(0161718193334353650515253) 

    val inputColNames = ignoredColumns.map(l => allColumnNames(l)) 

    val columnNames = allColumnNames. 
    filter { !inputColNames.contains(_) } 

    // 滤掉不需要的列,并填充缺失值 
    val typeTransformer = new FillMissingValueTranformer().setInputCols(inputColNames) 

    // 构造标签列 
    val labelIndexer = new StringIndexer() 
    .setInputCol("activityId") 
    .setOutputCol("indexedLabel") 
    .fit(df) 

    // 构造特征列 
    val vectorAssembler = new VectorAssembler() 
    .setInputCols(columnNames) 
    .setOutputCol("featureVector") 

    // 配置分类器 
    val rfClassifier = new RandomForestClassifier() 
    .setLabelCol("indexedLabel") 
    .setFeaturesCol("featureVector") 
    .setFeatureSubsetStrategy("auto") 
    .setNumTrees(350) 
    .setMaxBins(30) 
    .setMaxDepth(30) 
    .setImpurity("entropy") 
    .setCacheNodeIds(true) 

    val labelConverter = new IndexToString() 
    .setInputCol("prediction") 
    .setOutputCol("predictedLabel") 
    .setLabels(labelIndexer.labels) 

    val Array(trainingData, testData) = df.randomSplit(Array(0.80.2)) 

    // 构建整个Pipeline 
    val pipeline = new Pipeline().setStages( 
       Array(typeTransformer, 
            labelIndexer,
            vectorAssembler,
            rfClassifier,
            labelConverter)) 

    val model = pipeline.fit(trainingData) 

    val predictionResultDF = model.transform(testData) 

    // 展示结果 
    predictionResultDF.select( 
    "hr""hand_temp""hand_accel1X""hand_accel1Y""hand_accel1Z""hand_accel2X""hand_accel2Y""hand_accel2Z""hand_gyroX""hand_gyroY""hand_gyroZ""hand_magnetX""hand_magnetY""hand_magnetZ""chest_temp""chest_accel1X""chest_accel1Y""chest_accel1Z""chest_accel2X""chest_accel2Y""chest_accel2Z""chest_gyroX""chest_gyroY""chest_gyroZ","chest_magnetX""chest_magnetY""chest_magnetZ""ankle_temp""ankle_accel1X""ankle_accel1Y""ankle_accel1Z""ankle_accel2X","ankle_accel2Y""ankle_accel2Z""ankle_gyroX""ankle_gyroY""ankle_gyroZ""ankle_magnetX""ankle_magnetY""ankle_magnetZ""indexedLabel""predictedLabel") 
    .show(20) 

    val evaluator = new MulticlassClassificationEvaluator() 
    .setLabelCol("indexedLabel") 
    .setPredictionCol("prediction") 
    .setMetricName("accuracy") 

    val predictionAccuracy = evaluator.evaluate(predictionResultDF) 

    // 模型性能 
    println("Testing Error = " + (1.0 - predictionAccuracy)) 

    val randomForestModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] 

    println("Trained Random Forest Model is:\n" + randomForestModel.toDebugString) 
  } 

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144

在处理流程中,用自定义的 Transformer 过滤掉了不需要的数据,并通过填充缺失值的方式对数据进行预处理。自定义的 Transformer 代码如下:

import org.apache.spark.ml.Transformer 
import org.apache.spark.ml.param.{ParamParamMap} 
import org.apache.spark.ml.util.{DefaultParamsWritableIdentifiable} 
import org.apache.spark.sql.types.{BooleanTypeNumericTypeStructType} 
import org.apache.spark.sql.{DataFrameDatasetRow} 
  • 1
  • 2
  • 3
  • 4
  • 5

// 继承基类Transformer
class FillMissingValueTranformer extends Transformer
{
   val uid: String = Identifiable.randomUID(“MissingValueTransformer”)

   final val inputCols = new Param[Array[String]](this“inputCol”“The input column”)

   override def transformSchema(schema: StructType): StructType = {
      // 检查输入和输出是否符合要求, 比如数据类型
      // 返回处理之后的schema
      val inputColNames = KaTeX parse error: Expected 'EOF', got '&' at position 14: (inputCols) &̲nbsp;&nbsp;&nbs…other of column $name is not supported.")
        }
      }

      if (incorrectColumns.nonEmpty) {
        throw new IllegalArgumentException(incorrectColumns.mkString(“\n”))
      }

      StructType(schema.fields)
   }

   def setInputCols(value: Array[String]): this.type = set(inputCols, value)

   override def transform(dataset: Dataset[_]): DataFrame = {

      val inputColNames = $(inputCols)

      var rawdata=dataset

      for (i<-inputColNames) {rawdata=rawdata.drop(i)}

      val allColumnNames = dataset.columns

      // 过滤掉不需要的列名
      val columnNames = allColumnNames.filter { !inputColNames.contains(_) }

      // 心率的空值填充为60,其他属性的空值填充为0
      var imputedValues:Map[String,Double]=Map()

      for (colname<-columnNames){
        if(colname==“hr”){imputedValues += (colname->60.0)}
        else {imputedValues += (colname->0.0)}
      }

      val processdata=rawdata.na.drop(26,columnNames).na.fill(imputedValues)
      processdata.toDF()
    }

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

}

小结

本课时,我们学习了决策树算法与随机森林算法,随机森林算法也曾经是各种数据科学竞赛中的明星算法,属于集成学习的一种。此外,可以发现随机森林算法对于分布式计算来说是很好的方法,这也是 MLlib 将其实现的原因。在本课时中,我们还实现了一个自定义 Transformer,也是对上一节课的复习。

最后给大家留一个思考题: 在配置分类器时,我们一共设置了多少个参数,每个参数有什么用?


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/569280
推荐阅读
相关标签
  

闽ICP备14008679号