赞
踩
记录pyspark的MLlib库学习篇,学习资料来自spark官方文档,主要记录pyspark相关内容,要么直接翻译过来,要么加上自己的理解。spark2.4.8官方文档如下:https://spark.apache.org/docs/2.4.8/ml-classification-regression.html#logistic-regression
如果对逻辑回归原理不清楚,请查看逻辑回归推导。spark的逻辑回归即可以用来处理二分类任务,也能用来预测多分类。spark中逻辑回归的损失函数是:
其中,
γ
γ
γ称为规范化系数,
α
α
α称为Elastic net,
a
∈
[
0
,
1
]
a∈ [0,1]
a∈[0,1],可以看到当
a
=
1
a=1
a=1时,正则化采用的是L1范数;
a
=
0
a=0
a=0时正则化采用L2范数。
逻辑回归常用参数:
参数 | 含义 |
---|---|
regParam | 正则化参数,默认值为0 |
elasticNetParam | α ∈ [ 0 , 1 ] α∈ [0,1] α∈[0,1],可以看到当 a = 1 a=1 a=1时,正则化采用的是L1范数; a = 0 a=0 a=0时正则化采用L2范数。 |
family | 定义分分类,有三个可选择值:auto、binomial、multinomial,默认为auto。 binomial:二分类, multinomial:多分类,(二分类也能用) auto:自动选择分类方式,当类别只有1个或2个是,设置为binomial,多分类时设置为multinomial |
featuresCol | 设置特征列名,默认为列名为’features’ |
fitIntercept | 是否匹配一个截距项,默认为True |
labelCol | 设置标签名,默认为“label” |
maxIter | 最大迭代次数,默认为100 |
predictionCol | 设置预测的列名,默认为"prediction" |
probabilityCol | 设置预测的条件概率列名,默认为“probabi” |
threshold | 二元分类阈值,默认为0.5 |
standardization | 模型拟合前是否将输入特征做标准化处理,默认为True |
weightCol | 设置权重列名 |
tol | 迭代算法收敛阈值,默认为1e-6 |
# 1.导包 from pyspark.ml.classification import LogisticRegression # 2.生成数据,这里用自己的路径 data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_libsvm_data.txt") # 3.设置模型参数,elasticNetParam是α, regParam是规范化系数γ lr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8) # 4.训练模型 lrModel = lr.fit(data) # 5.查看模型参数 查看系数和截距 print(f"当前逻辑回归的各个系数是:{lrModel.coefficients}") print(f"当前逻辑回归的截距是:{lrModel.intercept}") # 结果为: 当前逻辑回归的各个系数是:(692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.353983524188197e-05,-9.102738505589466e-05,-0.00019467430546904298,-0.00020300642473486668,-3.1476183314863995e-05,-6.842977602660743e-05,1.5883626898239883e-05,1.4023497091372047e-05,0.00035432047524968605,0.00011443272898171087,0.00010016712383666666,0.0006014109303795481,0.0002840248179122762,-0.00011541084736508837,0.000385996886312906,0.000635019557424107,-0.00011506412384575676,-0.00015271865864986808,0.0002804933808994214,0.0006070117471191634,-0.0002008459663247437,-0.0001421075579290126,0.0002739010341160883,0.00027730456244968115,-9.838027027269332e-05,-0.0003808522443517704,-0.00025315198008555033,0.00027747714770754307,-0.0002443619763919199,-0.0015394744687597765,-0.00023073328411331293]) 当前逻辑回归的截距是:0.22456315961250325 # 可以看到系数是一个稀疏向量,692表示有692个系数,第一个列表表示非零的位置,第二个列表表示非零的值。 # 6. 将family设置为multinomial也能用来进行二分类 mlr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8,family="multinomial") mlrModel = mlr.fit(data) print(f"当前逻辑回归的各个系数是:{mlrModel.coefficientMatrix}") print(f"当前逻辑回归的截距是:{mlrModel.interceptVector}") # 结果是: 当前逻辑回归的各个系数是:2 X 692 CSRMatrix (0,244) 0.0 (0,263) 0.0001 (0,272) 0.0001 (0,300) 0.0001 (0,350) -0.0 (0,351) -0.0 (0,378) -0.0 (0,379) -0.0 (0,405) -0.0 (0,406) -0.0006 (0,407) -0.0001 (0,428) 0.0001 (0,433) -0.0 (0,434) -0.0007 (0,455) 0.0001 (0,456) 0.0001 .. .. 当前逻辑回归的截距是:[-0.12065879445860686,0.12065879445860686]
LogisticRegressionTrainingSummary记录了LogisticRegressionModel(Model是算法调用fit()方法后生成的)的摘要信息。在二元分类的情况下,可以获取某些附加指标,例如ROC曲线。
# 1.导包 from pyspark.ml.classification import LogisticRegression # 2.获取摘要信息。 摘要来自于已经得到的LogisticRegressionModel模型 trainingSummay = lrModel.summary # 3.获取每次迭代的目标 objectiveHistory = trainingSummay.objectiveHistory print("objcectiveHistory") for objective in objectiveHistory: print(objective) # 4.获取ROC的取值及其面积 trainingSummary.roc.show() print(f"ROC面积:{trainingSummary.areaUnderROC}") # 结果为: +---+--------------------+ |FPR| TPR| +---+--------------------+ |0.0| 0.0| |0.0|0.017543859649122806| |0.0| 0.03508771929824561| |0.0| 0.05263157894736842| |0.0| 0.07017543859649122| |0.0| 0.08771929824561403| |0.0| 0.10526315789473684| |0.0| 0.12280701754385964| |0.0| 0.14035087719298245| |0.0| 0.15789473684210525| |0.0| 0.17543859649122806| |0.0| 0.19298245614035087| |0.0| 0.21052631578947367| |0.0| 0.22807017543859648| |0.0| 0.24561403508771928| |0.0| 0.2631578947368421| |0.0| 0.2807017543859649| |0.0| 0.2982456140350877| |0.0| 0.3157894736842105| |0.0| 0.3333333333333333| +---+--------------------+ only showing top 20 rows ROC面积:1.0 # 只显示了前20个数据,ROC面积为1,说明模型的训练结果十分好
# 1.导包 from pyspark.ml.classification import LogisticRegression # 2.生成数据。 这里用自己的路径 data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_multiclass_classification_data.txt") # 3.构建模型 lr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8) lrModel = lr.fit(data) print(f"当前逻辑回归的各个系数是:{mlrModel.coefficientMatrix}") print(f"当前逻辑回归的截距是:{mlrModel.interceptVector}") # 结果: 当前逻辑回归的各个系数是:2 X 692 CSRMatrix (0,244) 0.0 (0,263) 0.0001 (0,272) 0.0001 (0,300) 0.0001 (0,350) -0.0 (0,351) -0.0 (0,378) -0.0 (0,379) -0.0 (0,405) -0.0 (0,406) -0.0006 (0,407) -0.0001 (0,428) 0.0001 (0,433) -0.0 (0,434) -0.0007 (0,455) 0.0001 (0,456) 0.0001 .. .. 当前逻辑回归的截距是:[-0.12065879445860686,0.12065879445860686] # 4.查看摘要信息 trainingSummary = lrModel.summary objectiveHistory = trainingSummary.objectiveHistory print("objectiveHistory:") for objective in objectiveHistory: print(objective) # 5.对于多类,我们可以基于每个标签检查度量 # 假正例率 print("False positive rate by label:") for i,rate in enumerate(traingingSummary.falsePositiveRateByLabel): print("label %d: %s" % (i, rate)) # 结果: label 0: 0.22 label 1: 0.05 label 2: 0.0 # 真正例率 print("True positive rate by label:") for i,rate in enumerate(trainingSummary.truePositiveRateByLabel): print("label %d: %s" % (i, rate)) #结果:label 0: 1.0 label 1: 1.0 label 2: 0.46 # 查准率 print("Precision by label:") for i,rate in enumerate(trainingSummary.precisionByLabel): print("label %d: %s" % (i, rate)) # 结果: label 0: 0.6944444444444444 label 1: 0.9090909090909091 label 2: 1.0 # 查全率(召回率) print("Recall by label:") for i,rate in enumerate(trainingSummary.recallByLabel): print("label %d: %s" % (i, rate)) # 结果: label 0: 1.0 label 1: 1.0 label 2: 0.46 # F得分,这里需要添加一堆括号,注意 print("F-measure by label:") for i,rate in enumerate(trainingSummary.fMeasureByLabel()): print("label %d: %s" % (i, rate)) # 结果: label 0: 0.819672131147541 label 1: 0.9523809523809523 label 2: 0.6301369863013699
前面三步简单介绍了下逻辑回归二分类和多分类的大体做法,接下来介绍逻辑回归的模型选择,并将前两天学到的管道运用起来,避免遗忘。
机器学习中的一项重要任务是模型选择,利用数据找到最佳模型或参数,也称为调参。Spark MLlib利用 CrossValidator(交叉验证) 或 TrainValidationSplit(划分训练集验证集)来进行模型的选择。这俩方法的输入要求如下:
1)Estimator:需要调参的算法或管道Pipeline;
2)参数列表ParamMap:可供选择的参数,也叫“参数网格”的搜索空间;
3)Evaluator: 用来评估模型拟合效果的指标或方法。
更具体来说,模型选择的工作原理:
1)将数据集划分为训练集和测试集;
2)对于每一个(训练集,测试集)对,遍历一遍参数列表ParamMap,那么每一个参数都对应一个拟合模型,于是就会得到很多个模型,然后用Evaluator来评估这些模型;
3)选择性能最优的模型对应的各个参数作为模型的最终参数。
Evaluator(评估器)可以是用于回归任务的回归评估器 RegressionEvaluator,也可以是用于二分类的二分类评估器 BinaryClassificationEvaluator,当然也可以是用于多分类的多分类评估器 MulticlassClassificationEvaluator。
我们可以使用ParamGridBuilder来构造参数网格,默认情况下,参数网格中的参数集以串行方式计算。在使用 CrossValidator 或 TrainValidationSplit 运行模型选择之前,通过将并行度设置为 一个≥2的数(值为 1 将是串行的)来并行完成参数评估。这个平行度并不是越大越好,对于一个集群来说,这个值一般不要超过10。
暂时不在这里介绍交叉验证法,未来会专门写一个专题来介绍验证法。所以此处直接使用该方法
# 1.导包:逻辑回归、管道流、二分类评估器、特诊选择、交叉验证 from pyspark.ml.classification import LogisticRegression from pyspark.ml import Pipeline from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF,Tokenizer from pyspark.ml.tuning import CrossValidator,ParamGridBuilder # 2.生成数据 spark.ml处理的数据格式是DataFrame training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), (4, "b spark who", 1.0), (5, "g d a y", 0.0), (6, "spark fly", 1.0), (7, "was mapreduce", 0.0), (8, "e spark program", 1.0), (9, "a e c l", 0.0), (10, "spark compile", 1.0), (11, "hadoop software", 0.0) ],['id','text','label']) # 3.建立管道,本例包含三个阶段:tokenizer,hashingTF,lr # 3.1 句子拆分成单词 tokenizer = Tokenizer(inputCol='text',outputCol='words') # 3.2 单词转换为特征向量 hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol='features') # hashingTF = HashingTF(inputCol='words',outputCol='features') 也一样 # 3.3 建立逻辑回归模型 lr = LogisticRegression(maxIter=10) # 3.4 建立管道流 pipeline = Pipeline(stages=[tokenizer,hashingTF,lr]) # 4.通过第3步建立了管道流,这是一个Estimator(调用fit方法后才会一个Model,也就成为了Transform), # 将它整体放入交叉验证CrossValidator实例中。 CrossValidator的参数有三个:Estimator,ParamMaps, Evaluator # 利用ParamGridBuilder来构建一个参数搜索空间。利用addGrid来添加参数,利用build来构建设定的参数网格。在此处设定hashingTF有三个参数需要调参,lr两个参数需要调参 # 于是就会产生3*2=6个参数组合,然后由CrossValidator来做选择,交叉验证的折数是2 paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures,[10,100,1000])\ .addGrid(lr.regParam,[0.1,0.01])\ .build() # 交叉验证需要输入:待调参的模型、搜索网格、evaluator,交叉验证的折数 crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # 5.运行交叉验证,选择最优模型(此时已经自动选择了最优模型) cvModel = crossval.fit(training) # 6.准备测试集(没有标签) test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "mapreduce spark"), (7, "apache hadoop") ], ["id", "text"]) # 7.做预测 prediction = cvModel.transform(test) selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): print(row) # 输出结果: Row(id=4, text='spark i j k', probability=DenseVector([0.6287, 0.3713]), prediction=0.0) Row(id=5, text='l m n', probability=DenseVector([0.3454, 0.6546]), prediction=1.0) Row(id=6, text='mapreduce spark', probability=DenseVector([0.3373, 0.6627]), prediction=1.0) Row(id=7, text='apache hadoop', probability=DenseVector([0.2749, 0.7251]), prediction=1.0) # 8.查看最优模型信息:bestModel属性获取最优模型 bestModel = cvModel.bestModel # 在构建管道流时,lr是管道流的第三个阶段(阶段从0开始),所以要获取指定lr模型,指定阶段即可 lrBestModel = bestModel.stages[2] print(f"最优模型的系数:{lrBestModel.coefficients}") print(f"最优模型的截距:{lrBestModel.interceptVector}")
输出结果见下图
TrainValidationSplit将一个数据集划分为训练集和验证集,划分比例参数为trainRatio,假设trainRatio=0.75,那么表示将整个数据集的75%划分为训练集,剩余的25%划分为验证集。下面以线性回归来演示:
# 1.导包 from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit # 2.生成数据 data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_linear_regression_data.txt") # 3.划分数据集,10%的数据作为测试集,90%的数据作为训练集(后面又会将该部分划分为训练集和验证集) train,test = data.randomSplit([0.9,0.1],seed=12345) # 4. 创建模型 lr = LinearRegression(maxIter=10) # 5.创建搜索空间 paramGrid = ParamGridBuilder()\ .addGrid(lr.regParam,[0.1,0.01])\ .addGrid(lr.fitIntercept,[False,True])\ .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\ .build() # 6.设置TrainValidationSplit,训练集中80%的数据用于训练,20%的数据用于验证集(选择模型或称为调参) tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(), trainRatio=0.8) # 7. 拟合数据,得到最优模型 model = tvs.fit(train) # 8.预测(经过拟合后已经得到最优模型) model.transform(test)\ .select("features", "label", "prediction")\ .show()
输出的结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。