当前位置:   article > 正文

Pyspark一个完整建模流程Demo_pyspark demo

pyspark demo

本文通过pyspark构建一个树模型,展示完整的建模流程。

  1. 数据加载
  2. 数据预处理
  3. 特征工程
  4. 划分数据集
  5. 训练模型
  6. 模型评估
  7. 模型优化
  8. 模型应用

以下是基于 PySpark 构建树模型的完整流程:

  1. 数据加载

使用 PySpark 中的 SparkSession 对象加载数据集,如 CSV 文件或数据库表等。

from pyspark.sql import SparkSession

# 创建 SparkSession 对象
spark = SparkSession.builder.appName("Tree Model").getOrCreate()

# 加载 CSV 文件
df = spark.read.csv("data.csv", header=True, inferSchema=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 数据预处理

对数据进行清洗、处理缺失值、特征选择等预处理操作。

# 清除空行和重复项
df = df.dropna().dropDuplicates()

# 处理缺失值
df = df.fillna(0)

# 特征选择
selected_cols = ["feature1", "feature2", "feature3"]
df = df.select(selected_cols)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 特征工程

对选定的特征进行转换、缩放、编码等特征工程操作。

from pyspark.ml.feature import VectorAssembler, StandardScaler

# 将多个特征合并为一个向量
assembler = VectorAssembler(inputCols=selected_cols, outputCol="features")
df = assembler.transform(df)

# 缩放特征
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 划分数据集

将数据集分成训练集和测试集,并将其转换为 PySpark 的 DataFrame 对象。

# 随机划分训练集和测试集
train, test = df.randomSplit([0.7, 0.3], seed=123)

# 转换成 DataFrame 对象
train = train.select("label", "scaledFeatures")
test = test.select("label", "scaledFeatures")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 训练模型

使用 PySpark 中的 DecisionTreeClassifier 类训练决策树模型。

from pyspark.ml.classification import DecisionTreeClassifier

# 创建模型并拟合数据
dt = DecisionTreeClassifier(featuresCol="scaledFeatures", labelCol="label", maxDepth=3)
dtModel = dt.fit(train)
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 模型评估

使用 PySpark 中的 BinaryClassificationEvaluatorMulticlassClassificationEvaluator 对模型进行性能评估。也可以使用其他评估器根据需求进行修改。

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 二分类评估
evaluator = BinaryClassificationEvaluator(labelCol="label")
bce = evaluator.evaluate(dtModel.transform(test))
print("AUC: ", bce)

# 多分类评估
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
mce = evaluator.evaluate(dtModel.transform(test))
print("Accuracy: ", mce)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 模型优化

可以通过调整树的深度、最大叶子节点数、最小信息增益等参数来优化模型性能。

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# 创建参数网格
paramGrid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [3, 5, 7]).\
    addGrid(dt.maxBins, [24, 28, 32]).\
    addGrid(dt.minInfoGain, [0.0, 0.1]).\
    build()

# 创建交叉验证器
cv = CrossValidator(estimator=dt, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

# 训练模型
cvModel = cv.fit(train)

# 评估优化后的模型
bce = evaluator.evaluate(cvModel.transform(test))
print("AUC: ", bce)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 模型应用

使用训练好的模型进行预测或分类任务。

# 使用优化后的模型进行预测
prediction = cvModel.transform(test)

# 显示预测结果
prediction.select("label", "prediction").show()
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/955402
推荐阅读
相关标签
  

闽ICP备14008679号