赞
踩
如果样本较少,可以直接使用python对样本进行ML建模,但当需要大规模数据集时,可以使用spark进行分布式内存计算,虽然spark的原生语言是scala,但如果用python写可以用pyspark进行机器学习的pipeline链路建立。
spark有MLlib机器学习库,比ML Pipeline复杂,先来大概看下ML Pipeline构建机器学习流程:
StringIndexer
:将文字分类特征转化为数字OneHotEncoder
:将数字分类特征转化为稀疏向量VectorAssembler
:将所有特征字段整合成一个Vector字段DecisionTreeClassfier
:训练生成模型pipeline.fit()
进行训练,产生pipelineModelpipelineModel.transform()
预测测试集,产生预测结果注意:pyspark的一些组件和python中的同名组件不完全一样:
DataFrame
: 是Spark ML机器学习API处理的数据格式,可以由文本文件、RDD、或者Spark SQL创建,与python 的Dataframe
概念相近但是方法完全不同。Transformer
:可以使用.transform方法将一个DataFrame转换成另一个DataFrame。Estimator
:可以使用.fit方法传入DataFrame,生成一个Transformer。pipeline
:可以串联多个Transformer和Estimator建立ML机器学习的工作流。Parameter
:以上Transformer和Estimator都可以共享的参数API。Spark中的GBDT较GBTs——梯度提升树,因为其是基于决策树(Decision Tree,DT)实现的,所以叫GBDT。Spark 中的GBDT算法存在于ml包和mllib包中:
由于在实际生产环境中使用基于RDD的较多,所以直接使用MLLib包中的GBTs,ML包中的则进行简单说明。
# gbdt_test import findspark findspark.init() import pyspark from pyspark import SparkConf from pyspark.ml import Pipeline from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.feature import StringIndexer, VectorIndexer,IndexToString from pyspark.ml.classification import GBTClassifier file_path = "file:///home/hadoop/development/RecSys/data" # def gradientBoostedTreeClassifier(data="data/sample_libsvm_data.txt"): # def gradientBoostedTreeClassifier(data): ''' GBDT分类器 ''' #加载LIBSVM格式的数据集 data = spark.read.format("libsvm").load(data) labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) data.show(n = 3) """ +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[127,128,129...| | 1.0|(692,[158,159,160...| | 1.0|(692,[124,125,126...| +-----+--------------------+ only showing top 3 rows """
#训练集、测试集划分 (trainingData, testData) = data.randomSplit([0.7, 0.3]) #print("训练集:\n", trainingData.show(n = 1), "\n") #print("测试集:\n", testData.show(n = 1)) # 使用10个基分类器 gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10) print("gbt_test:\n", gbt, "\n") # 建立模型的pipeline pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt]) print("pipeline:\n", type(pipeline), "\n") model = pipeline.fit(trainingData) # 做预测 predictions = model.transform(testData) #展示前5行数据 predictions.select("prediction", "indexedLabel", "features").show(5) #展示预测标签与真实标签,计算测试误差 fit part evaluator = MulticlassClassificationEvaluator( labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") # predict accuracy = evaluator.evaluate(predictions) print("Test Error = %g" % (1.0 - accuracy)) gbtModel = model.stages[2] print('gbtModelSummary: ',gbtModel) #模型摘要
结果如下,从Test Error = 0.12
看,即accuracy
为98%的效果,以上即一个简单的GBDT分类任务,通过10个基分类器,根据boosting策略根据负梯度的优化:
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[95,96,97,12...| +-----+--------------------+ only showing top 1 row 训练集: None +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[123,124,125...| +-----+--------------------+ only showing top 1 row 测试集: None gbt_test: GBTClassifier_eafe5d3c8749 pipeline: <class 'pyspark.ml.pipeline.Pipeline'> +----------+------------+--------------------+ |prediction|indexedLabel| features| +----------+------------+--------------------+ | 1.0| 1.0|(692,[123,124,125...| | 1.0| 1.0|(692,[124,125,126...| | 1.0| 1.0|(692,[126,127,128...| | 1.0| 1.0|(692,[129,130,131...| | 1.0| 1.0|(692,[150,151,152...| +----------+------------+--------------------+ only showing top 5 rows Test Error = 0.12 gbtModelSummary: GBTClassificationModel: uid = GBTClassifier_eafe5d3c8749, numTrees=10, numClasses=2, numFeatures=692
[1] Spark 排序算法系列之 GBTs 使用方式介绍
[2] MLlib:https://www.jianshu.com/p/4d7003182398
[3] pyspark学习之——流水线Pipeline
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。