当前位置:   article > 正文

spark之常见的机器学习算法_spark机器算法

spark机器算法

1、分类、回归、聚类

分类回归是一种重要的机器学习和数据挖掘技术。分类的目的是根据数据集的特点构造一个分类函数或分类模型(也常常称作分类器),该模型能把未知类别的样本映射到给定类别中的一种技术。

即: 向量X=[x1,x2...xn]但标签C=[c1,c2...,ck]的映射F(W,X)=C

聚类是一种无监督学习的方法,将无标签数据聚类到不同的簇中

spark.ml支持的分类与回归算法

2、常见算法实例介绍

2.1、逻辑回归:

2.1.1、逻辑斯蒂分布

2.1.2、二项LR:

2.1.3、似然函数:

求极大值,得到w的估计值。求极值的方法可以是梯度下降法,梯度上升法等。

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import Row,functions
  3. from pyspark.ml.linalg import Vector,Vectors
  4. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  5. from pyspark.ml import Pipeline
  6. from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer
  7. from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,BinaryLogisticRegressionSummary, LogisticRegression
  8. spark = SparkSession.builder.master("local").appName("spark ML").getOrCreate()
  9. def f(x):
  10. rel = {}
  11. rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
  12. rel['label'] = str(x[4])
  13. return rel
  14. #读取数据集并转化为DataFrame
  15. df= spark.sparkContext.textFile("file:///home/work/jiangshuangyan001/spark_test/iris.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()
  16. #构建二分数据集
  17. df.createOrReplaceTempView("iris")
  18. df = spark.sql("select * from iris where label != 'Iris-setosa'")
  19. rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()
  20. # for item in rel:
  21. # print(item)
  22. #构建ML的pipeline
  23. labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
  24. featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)
  25. df.show()
  26. df_train, df_test = df.randomSplit([0.7,0.3])
  27. #用setter的方法设置logistic的参数
  28. lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
  29. #print("LogisticRegression parameters:\n" + lr.explainParams())
  30. labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
  31. lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
  32. lrPipelineModel = lrPipeline.fit(df_train)
  33. lrPredictions = lrPipelineModel.transform(df_test)
  34. #预测结果
  35. preRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
  36. for item in preRel:
  37. print(str(item['label'])+','+str(item['features'])+'-->prob='+str(item['probability'])+',predictedLabel'+str(item['predictedLabel']))
  38. #评估
  39. evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
  40. lrAccuracy = evaluator.evaluate(lrPredictions)
  41. print("Test Error = " , str(1.0 - lrAccuracy))

2.2、决策树

​ 决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类。

​ 决策树学习通常包括3个步骤:特征选择、决策树的生成和决策树的剪枝。

2.2.1、特征选择

  • 信息增益(informational entropy)表示得知某一特征后使得信息的不确定性减少的程度。

          

  • 信息增益比: 为解决信息增益偏向值多的特征而提出

                              

             为特征A的熵值

  • 基尼指数:分类问题中,假设有K个类,样本点属于第K类的概率为

        

2.2.2、决策树的生成

从根结点开始,对结点计算所有可能的特征的信息增益,选择信息增益最大的特征作为结点的特征,由该特征的不同取值建立子结点,再对子结点递归地调用以上方法,构建决策树;直到所有特征的信息增均很小或没有特征可以选择为止,最后得到一个决策树。

​ 决策树需要有停止条件来终止其生长的过程。一般来说最低的条件是:当该节点下面的所有记录都属于同一类,或者当所有的记录属性都具有相同的值时。这两种条件是停止决策树的必要条件,也是最低的条件。在实际运用中一般希望决策树提前停止生长,限定叶节点包含的最低数据量,以防止由于过度生长造成的过拟合问题。

2.2.3、决策树的剪枝

​ 决策树生成算法递归地产生决策树,直到不能继续下去为止。这样产生的树往往对训练数据的分类很准确,但对未知的测试数据的分类却没有那么准确,即出现过拟合现象。解决这个问题的办法是考虑决策树的复杂度,对已生成的决策树进行简化,这个过程称为剪枝。

​ 决策树的剪枝往往通过极小化决策树整体的损失函数来实现。一般来说,损失函数可以进行如下的定义:

2.2.3、实例

  1. #导入所需要的包
  2. from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
  3. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  4. #训练决策树模型,这里我们可以通过setter的方法来设置决策树的参数,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。具体的可以设置的参数可以通过explainParams()来获取。
  5. dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
  6. #在pipeline中进行设置
  7. pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
  8. #训练决策树模型
  9. modelClassifier = pipelinedClassifier.fit(df_train)
  10. #进行预测
  11. predictionsClassifier = modelClassifier.transform(df_test)
  12. #查看部分预测的结果
  13. predictionsClassifier.select("predictedLabel", "label", "features").show(20)
  14. evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
  15. accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
  16. print("Test Error = " + str(1.0 - accuracy))
  17. treeModelClassifier = modelClassifier.stages[2]
  18. print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))

2.3、聚类kmeans

1.根据给定的k值,选取k个样本点作为初始划分中心;

2.计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心;

3.计算每个划分中样本点的平均值,将其作为新的中心;

  1. from pyspark.sql import Row
  2. from pyspark.ml.clustering import KMeans,KMeansModel
  3. from pyspark.ml.linalg import Vectors
  4. #ML包下的KMeans方法也有Seed(随机数种子)、Tol(收敛阈值)、K(簇个数)、MaxIter(最大迭代次数)、initMode(初始化方式)、initStep(KMeans||方法的步数)等参数可供设置
  5. kmeansmodel = KMeans().setK(2).setFeaturesCol('features').setPredictionCol('prediction').fit(df)
  6. kmeansmodel.transform(df).show()
  7. results = kmeansmodel.transform(df).collect()
  8. for item in results:
  9. print(str(item[0])+' is predcted as cluster'+ str(item[2]))
  10. #获取聚类中心的情况
  11. #也可以通过KMeansModel类自带的clusterCenters属性获取到模型的所有聚类中心情况:
  12. results2 = kmeansmodel.clusterCenters()
  13. for item in results2:
  14. print(item)
  15. #与MLlib下的实现相同,KMeansModel类也提供了计算 集合内误差平方和(Within Set Sum of Squared Error, WSSSE) 的方法来度量聚类的有效性,在真实K值未知的情况下,该值的变化可以作为选取合适K值的一个重要参考:
  16. kmeansmodel.computeCost(df)

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/932741
推荐阅读
相关标签
  

闽ICP备14008679号