当前位置:   article > 正文

pyspark使用说明_pyspark svm教材

pyspark svm教材

PySpark 

PySpark 是 Spark 为 Python 开发者提供的 API ,位于 $SPARK_HOME/bin 目录,使用也非常简单,进入pyspark shell就可以使用了。

子模块

pyspark.sql 模块

pyspark.streaming 模块

pyspark.ml 包

pyspark.mllib

PySpark 提供的类

pyspark.SparkConf

pyspark.SparkConf 类提供了对一个 Spark 应用程序配置的操作方法。用于将各种Spark参数设置为键值对。

pyspark.SparkContext

pyspark.SparkContext 类提供了应用与 Spark 交互的主入口点,表示应用与 Spark 集群的连接,基于这个连接,应用可以在该集群上创建 RDD 和 广播变量 (pyspark.Broadcast)

pyspark.SparkFiles

SparkFiles 只包含类方法,开发者不应创建 SparkFiles 类的实例  。

pyspark.RDD

这个类是为 PySpark 操作 RDD提供了基础方法  。

first() 是 pyspark.RDD 类提供的方法,返回 RDD 的第一个元素。

aggregate() 方法使用给定的组合函数和中性“零值,先聚合每个分区的元素,然后再聚合所有分区的结果。

cache() 使用默认存储级别(MEMORY_ONLY)对此 RDD 进行持久化  

collect() 返回一个列表,包含此 RDD 中所有元素 。

pyspark.Accumulator

一种“只允许添加”的共享变量,Spark 任务只能向其添加值  。

pyspark.Broadcast

Spark 提供了两种共享变量  :广播变量 和 累加器,pyspark.Broadcast 类提供了对广播变量的操作方法。

pyspark.Accumulator

pyspark.Accumulator 提供了对累加器变量的操作方法[  。

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。

测试过程

hdfs用户下进入pyspark  shell

/usr/hdp/2.6.0.3-8/spark/bin/pyspark

示例参考文献http://spark.apache.org/docs/1.6.3/mllib-statistics.html

1、基本统计Basic Statistics 

概要统计(Summary statistics)

MultivariateStatisticalSummary colStats()返回一个实例,包含列的最大值,最小值,均值,方差,和非零的数量以及总数量。

此案例参考的spark2.2.0文档,1.6.3文档有误,以下全可复制

  1. import numpy as np
  2. from pyspark.mllib.stat import Statistics
  3. mat = sc.parallelize(
  4. [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])]
  5. ) # an RDD of Vectors
  6. # Compute column summary statistics.
  7. summary = Statistics.colStats(mat)
  8. print(summary.mean()) # a dense vector containing the mean value for each column
  9. print(summary.variance()) # column-wise variance
  10. print(summary.numNonzeros()) # number of nonzeros in each column

2、相关性分析(Correlations)

 计算两个系列(series)数据之间的相关性的数据是在统计学一种常见的操作。在 spark.mllib 我们提供灵活的计算两两之间的相关性的方法。支持计算相关性的方法目前有 Pearson’s and Spearman’s (皮尔森和斯皮尔曼) 的相关性.

Statistics提供了计算系列之间相关性的方法。 根据输入类型,两个RDD [Double]或RDD [Vector],输出分别为Double或相关矩阵

此案例参考的spark2.2.0文档,1.6.3文档不完整,以下全可复制

  1. from pyspark.mllib.stat import Statistics
  2. seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0]) # a series
  3. # seriesY must have the same number of partitions and cardinality as seriesX
  4. seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])
  5. # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
  6. # If a method is not specified, Pearson's method will be used by default.
  7. print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))
  8. data = sc.parallelize(
  9. [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
  10. ) # an RDD of Vectors
  11. # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
  12. # If a method is not specified, Pearson's method will be used by default.
  13. print(Statistics.corr(data, method="pearson"))

3、分层抽样(Stratified sampling)

与spark.mllib中的其他统计功能不同,sampleByKey和sampleByKeyExact可以对键值对的RDD执行分层采样方法。 对于分层采样,键可以被认为是一个标签,该值作为一个特定属性。 例如,key 可以是男人或女人或文档ID,并且相应的 value 可以是人的年龄列表或文档中的单词列表。 sampleByKey方法将类似掷硬币方式来决定观察是否被采样,因此需要一次遍历数据,并提供期望的样本大小。 sampleByKeyExact需要比sampleByKey中使用的每层简单随机抽样花费更多的资源,但将提供99.99%置信度的确切抽样大小。 python当前不支持sampleByKeyExact。

  1. data = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')])
  2. fractions = {1: 0.1, 2: 0.6, 3: 0.3}
  3. approxSample = data.sampleByKey(False, fractions)
  4. for each in approxSample.collect():print(each)//此处需敲2次回车

4、假设检验(Hypothesis testing)

假设检验是统计学中强大的工具,用于确定结果是否具有统计学意义,无论该结果是否偶然发生。 spark.mllib目前支持Pearson’s chi-squared ( χ 2   χ2))测试,以获得拟合优度和独立性。 输入数据类型确定是否进行拟合优度或独立性测试。 拟合优度测试需要输入类型的Vector,而独立性测试需要一个 Matrix作为输入。

统计学提供了运行Pearson的chi - squared测试的方法。下面的示例演示如何运行和解释假设测试。

  1. from pyspark.mllib.linalg import Matrices, Vectors
  2. from pyspark.mllib.regression import LabeledPoint
  3. from pyspark.mllib.stat import Statistics
  4. vec = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25) # a vector composed of the frequencies of events
  5. # compute the goodness of fit. If a second vector to test against
  6. # is not supplied as a parameter, the test runs against a uniform distribution.
  7. goodnessOfFitTestResult = Statistics.chiSqTest(vec)
  8. # summary of the test including the p-value, degrees of freedom,
  9. # test statistic, the method used, and the null hypothesis.
  10. print("%s\n" % goodnessOfFitTestResult)
  11. mat = Matrices.dense(3, 2, [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]) # a contingency matrix
  12. # conduct Pearson's independence test on the input contingency matrix
  13. independenceTestResult = Statistics.chiSqTest(mat)
  14. # summary of the test including the p-value, degrees of freedom,
  15. # test statistic, the method used, and the null hypothesis.
  16. print("%s\n" % independenceTestResult)
  17. obs = sc.parallelize(
  18. [LabeledPoint(1.0, [1.0, 0.0, 3.0]),
  19. LabeledPoint(1.0, [1.0, 2.0, 0.0]),
  20. LabeledPoint(1.0, [-1.0, 0.0, -0.5])]
  21. ) # LabeledPoint(feature, label)
  22. # The contingency table is constructed from an RDD of LabeledPoint and used to conduct
  23. # the independence test. Returns an array containing the ChiSquaredTestResult for every feature
  24. # against the label.
  25. featureTestResults = Statistics.chiSqTest(obs)
  26. for i, result in enumerate(featureTestResults):
  27. print("Column %d:\n%s" % (i + 1, result)) //此处需敲2次回车

此外, spark.mllib 提供了对于概率分布相等的Kolmogorov-Smirnov(KS)测试的单样本双侧实现。 通过提供理论分布(目前仅为正态分布支持)及其参数的名称,或根据给定理论分布计算累积分布的函数,用户可以测试其假设,即样本服从该分布。 在用户根据正态分布(distName =“norm”)进行测试但不提供分发参数的情况下,测试将初始化为标准正态分布并记录适当的消息。

Statistics 提供了运行单样本,双侧Kolmogorov-Smirnov检验的方法。 以下示例演示如何运行和解释假设检验。

  1. from pyspark.mllib.stat import Statistics
  2. parallelData = sc.parallelize([0.1, 0.15, 0.2, 0.3, 0.25])
  3. # run a KS test for the sample versus a standard normal distribution
  4. testResult = Statistics.kolmogorovSmirnovTest(parallelData, "norm", 0, 1)
  5. # summary of the test including the p-value, test statistic, and null hypothesis
  6. # if our p-value indicates significance, we can reject the null hypothesis
  7. # Note that the Scala functionality of calling Statistics.kolmogorovSmirnovTest with
  8. # a lambda to calculate the CDF is not made available in the Python API
  9. print(testResult)

5、流式测试Streaming Significance Testing

  至spark2.2.0暂无示例

6、随机数据生成Random data generation

随机数据生成对于随机算法,原型设计和性能测试很有用。 spark.mllib支持使用i.i.d.生成随机RDD。 从给定分布绘制的值:均匀,标准正态或泊松分布。

RandomRDDs 提供工厂方法来生成随机double型RDD或向量RDD。 以下示例生成随机double型RDD,其值遵循标准正态分布N(0,1),然后映射到N(1,4)。

官网示例不完整,此示例来源https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/random_rdd_generation.py

  1. from pyspark.mllib.random import RandomRDDs
  2. numExamples = 10000
  3. # number of examples to generate
  4. fraction = 0.1 # fraction of data to sample
  5. # Example: RandomRDDs.normalRDD
  6. normalRDD = RandomRDDs.normalRDD(sc, numExamples)
  7. print('Generated RDD of %d examples sampled from the standard normal distribution'% normalRDD.count())
  8. print(' First 5 samples:')
  9. for sample in normalRDD.take(5):print(' ' + str(sample)) //此处需敲2次回车
  10. print()
  11. # Example: RandomRDDs.normalVectorRDD
  12. normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2)
  13. print('Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count())
  14. print(' First 5 samples:')
  15. for sample in normalVectorRDD.take(5):print(' ' + str(sample)) 此处需敲2次回车
  16. print()

7、内核密度估计Kernel density estimation

核密度估计  是一种用于可视化经验概率分布的技术,而不需要对所观察到的样本的特定分布进行假设。 它计算在给定集合点评估的随机变量的概率密度函数的估计。 它通过在特定点表达PDF的经验分布来实现这一估计,这是以每个样本为中心的正态分布的PDF平均值.

KernelDensity 提供了从RDD样本计算核密度估计的方法。 以下示例演示如何执行此操作。

官网示例不完整,此示例来源https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/kernel_density_estimation_example.py

  1. from pyspark.mllib.stat import KernelDensity
  2. # an RDD of sample data
  3. data = sc.parallelize([1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0, 8.0, 9.0, 9.0])
  4. # Construct the density estimator with the sample data and a standard deviation for the Gaussian
  5. # kernels
  6. kd = KernelDensity()
  7. kd.setSample(data)
  8. kd.setBandwidth(3.0)
  9. # Find density estimates for the given values
  10. densities = kd.estimate([-1.0, 2.0, 5.0])
  11. print(densities)

8、分类和回归

spark.mllib包支持binary classification(二分类)multiclass classification(多分类)regression analysis(回归分析)的各种方法。

下表列出了每种类型问题支持的算法。

关于这些方法的更多细节可以在下面找到,内容参考http://spark.apache.org/docs/1.6.3/mllib-linear-methods.html

a)分类Classification

分类(Classification)的目标是把不同的数据项划分到不同的类别中。其中二元分类(binary classification),有正类和负类两种类别,是最常见的分类类型。如果多于两种类别,就是多元分类(multiclass classification.)。spark.mllib对分类有两种线性方法: linear Support Vector Machines (SVMs) and logistic regression。Linear SVMs 只支持二元分类。而logistic regression对二元和多元分类问题都支持。 对这种方法,spark.mllib都提供有 L1和 L2正则化下的两种变体。在MLlib中,测试数据集用RDD类型的LabeledPoint表示,其中labels从0开始索引:0,1,2……

线性支持向量机Linear Support Vector Machines(SVMs)

前提条件,上传文件sample_svm_data.txt到HDFS的/user/hdfs/data/mllib/

下面的示例展示了如何加载示例数据集,构建SVM模型,并使用结果模型进行预测,以计算训练错误。

示例来源https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/svm_with_sgd_example.py

  1. from pyspark.mllib.classification import SVMWithSGD, SVMModel
  2. from pyspark.mllib.regression import LabeledPoint
  3. # Load and parse the data
  4. def parsePoint(line):
  5. values = [float(x) for x in line.split(' ')]
  6. return LabeledPoint(values[0], values[1:])
  7. data = sc.textFile("data/mllib/sample_svm_data.txt")
  8. parsedData = data.map(parsePoint)
  9. # Build the model
  10. model = SVMWithSGD.train(parsedData, iterations=100)
  11. # Evaluating the model on training data
  12. labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
  13. trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
  14. print("Training Error = " + str(trainErr))
  15. # Save and load model
  16. model.save(sc, "target/tmp/pythonSVMWithSGDModel")
  17. sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")

逻辑回归Logistic regressio

下面的例子展示了如何加载一个示例数据集,构建逻辑回归模型,并对结果模型进行预测,以计算训练错误。

注意,至spark2.2.0 Python API还不支持多类分类和模型保存/加载,但将来会支持。

  1. from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
  2. from pyspark.mllib.regression import LabeledPoint
  3. # Load and parse the data
  4. def parsePoint(line):
  5. values = [float(x) for x in line.split(' ')]
  6. return LabeledPoint(values[0], values[1:])
  7. data = sc.textFile("data/mllib/sample_svm_data.txt")
  8. parsedData = data.map(parsePoint)
  9. # Build the model
  10. model = LogisticRegressionWithLBFGS.train(parsedData)
  11. # Evaluating the model on training data
  12. labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
  13. trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
  14. print("Training Error = " + str(trainErr))
  15. # Save and load model
  16. model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
  17. sameModel = LogisticRegressionModel.load(sc,"target/tmp/pythonLogisticRegressionWithLBFGSModel")

b)回归Regression

前提条件是上传lpsa.data文件到HDFS的/user/hdfs/data/mllib/ridge-data/目录下

Linear least squares, Lasso, and ridge regression

  1. from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
  2. # Load and parse the data
  3. def parsePoint(line):
  4. values = [float(x) for x in line.replace(',', ' ').split(' ')]
  5. return LabeledPoint(values[0], values[1:])
  6. data = sc.textFile("data/mllib/ridge-data/lpsa.data")
  7. parsedData = data.map(parsePoint)
  8. # Build the model
  9. model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)
  10. # Evaluate the model on training data
  11. valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
  12. MSE = valuesAndPreds \
  13. .map(lambda vp: (vp[0] - vp[1])**2) \
  14. .reduce(lambda x, y: x + y) / valuesAndPreds.count()
  15. print("Mean Squared Error = " + str(MSE))
  16. # Save and load model
  17. model.save(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
  18. sameModel = LinearRegressionModel.load(sc, "target/tmp/pythonLinearRegressionWithSGDModel")

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/568016
推荐阅读
相关标签
  

闽ICP备14008679号