当前位置:   article > 正文

pyspark使用XGboost训练模型实例_pyspark xgboost

pyspark xgboost

0、获取下载对应spark版本Xgoobst Jar的方法

 使用xgboost模型有三个依赖需要添加或配置:

(1)xgboost4j.jar https://mvnrepository.com/artifact/ml.dmlc/xgboost4j

(2)xgboost4j-spark.jar https://mvnrepository.com/artifact/ml.dmlc/xgboost4j-spark

  (3) sparkxgb.zip  这个github还可以,里面也有代码实践例子https://github.com/MachineLP/Spark-/tree/master/pyspark-xgboost

其他参考:去年同月份(2022-08)有一个xgboost的博文,记录了下载最新xgboost.jar的方法,

pyspark使用xgboost做模型训练_sparkxbg包_Just Jump的博客-CSDN博客

还有scala版本,配置pom的方法:Xgboost安装、使用和算法原理理解_xgboost 文档_Just Jump的博客-CSDN博客

1、这是一个跑通的代码实例,使用的是泰坦尼克生还数据,分类模型。

这里使用了Pipeline来封装特征处理和模型训练步骤,保存为pipelineModel

注意这里加载xgboost依赖的jar包和zip包的方法。

  1. #这是用 pipeline 包装了XGBOOST的例子。 此路通!
  2. import os
  3. import sys
  4. import time
  5. import pandas as pd
  6. import numpy as np
  7. import pyspark.sql.types as typ
  8. import pyspark.ml.feature as ft
  9. from pyspark.sql.functions import isnan, isnull
  10. from pyspark.sql.types import StructType, StructField
  11. from pyspark.sql.types import *
  12. from pyspark.ml.feature import StringIndexer, VectorAssembler
  13. from pyspark.ml import Pipeline
  14. from pyspark.sql.functions import col
  15. from pyspark.sql import SparkSession
  16. os.environ['PYSPARK_PYTHON'] = 'Python3.7/bin/python'
  17. os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'
  18. spark = SparkSession \
  19. .builder \
  20. .appName("PySpark XGBOOST Titanic") \
  21. .config('spark.driver.allowMultipleContexts', 'true') \
  22. .config('spark.pyspark.python', 'Python3.7/bin/python') \
  23. .config('spark.yarn.dist.archives', 'hdfs://ns62007/user/dmc_adm/_PYSPARK_ENV/Python3.7.zip#Python3.7') \
  24. .config('spark.executorEnv.PYSPARK_PYTHON', 'Python3.7/bin/python') \
  25. .config('spark.sql.autoBroadcastJoinThreshold', '-1') \
  26. .enableHiveSupport() \
  27. .getOrCreate()
  28. spark.sparkContext.addPyFile("sparkxgb.zip")
  29. schema = StructType(
  30. [StructField("PassengerId", DoubleType()),
  31. StructField("Survived", DoubleType()),
  32. StructField("Pclass", DoubleType()),
  33. StructField("Name", StringType()),
  34. StructField("Sex", StringType()),
  35. StructField("Age", DoubleType()),
  36. StructField("SibSp", DoubleType()),
  37. StructField("Parch", DoubleType()),
  38. StructField("Ticket", StringType()),
  39. StructField("Fare", DoubleType()),
  40. StructField("Cabin", StringType()),
  41. StructField("Embarked", StringType())
  42. ])
  43. upload_file = "titanic/train.csv"
  44. hdfs_path = "hdfs://tmp/gao/dev_data/dmb_upload_data/"
  45. file_path = os.path.join(hdfs_path, upload_file.split("/")[-1])
  46. df_raw = spark\
  47. .read\
  48. .option("header", "true")\
  49. .schema(schema)\
  50. .csv(file_path)
  51. df_raw.show(20)
  52. df = df_raw.na.fill(0)
  53. sexIndexer = StringIndexer()\
  54. .setInputCol("Sex")\
  55. .setOutputCol("SexIndex")\
  56. .setHandleInvalid("keep")
  57. cabinIndexer = StringIndexer()\
  58. .setInputCol("Cabin")\
  59. .setOutputCol("CabinIndex")\
  60. .setHandleInvalid("keep")
  61. embarkedIndexer = StringIndexer()\
  62. .setInputCol("Embarked")\
  63. .setHandleInvalid("keep")
  64. # .setOutputCol("EmbarkedIndex")\
  65. vectorAssembler = VectorAssembler()\
  66. .setInputCols(["Pclass", "Age", "SibSp", "Parch", "Fare"])\
  67. .setOutputCol("features")
  68. from sparkxgb import XGBoostClassifier
  69. xgboost = XGBoostClassifier(
  70. maxDepth=3,
  71. missing=float(0.0),
  72. featuresCol="features",
  73. labelCol="Survived"
  74. )
  75. pipeline = Pipeline(stages=[vectorAssembler, xgboost])
  76. trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
  77. trainDF.show(2)
  78. model = pipeline.fit(trainDF)
  79. print (88888888888888888888)
  80. model.transform(testDF).select(col("PassengerId"), col("Survived"), col("prediction")).show()
  81. print (9999999999999999999)
  82. # Write model/classifier
  83. model.write().overwrite().save(os.path.join(hdfs_path,"xgboost_class_test"))
  84. from pyspark.ml import PipelineModel
  85. model1 = PipelineModel.load(os.path.join(hdfs_path,"xgboost_class_test"))
  86. model1.transform(testDF).show()

这是执行结果:

2、当然也可以不用pipeline封装,直接训练xgboost模型并保存。这也是跑通的例子。

  1. # Train a xgboost model
  2. from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
  3. from pyspark.ml import Pipeline
  4. # spark.sparkContext.addPyFile("sparkxgb.zip") # read xgboost pyspark client lib
  5. from sparkxgb import XGBoostClassifier
  6. assembler = VectorAssembler(
  7. inputCols=[ 'Pclass',
  8. 'Age',
  9. 'SibSp',
  10. 'Parch',
  11. 'Fare' ],
  12. outputCol="features", handleInvalid="skip")
  13. xgboost = XGBoostClassifier(
  14. maxDepth=3,
  15. missing=float(0.0),
  16. featuresCol="features",
  17. labelCol="Survived")
  18. # pipeline = Pipeline(stages=[assembler, xgboost])
  19. # trained_model = pipeline.fit(data)
  20. trainDF, testDF = data.randomSplit([0.8, 0.2], seed=24)
  21. trainDF.show(2)
  22. td = assembler.transform(trainDF)
  23. td.cache()
  24. td.show()
  25. trained_raw_model = xgboost.fit(td)
  26. # save distribute and native model
  27. distrib_model_path = os.path.join(hdfs_path, 'distribute_xgboost_model')
  28. native_model_path = "./native_xgboost_model.xgboost"
  29. # 保存分布式的训练模型
  30. trained_raw_model.write().overwrite().save(distrib_model_path)
  31. # 保存模型到本地磁盘 # save trained model to local disk
  32. trained_raw_model.nativeBooster.saveModel(native_model_path)
  33. print(trained_raw_model.nativeBooster.gamma)
  34. # model predict
  35. result = trained_raw_model.transform(assembler.transform(testDF))
  36. # result.select(["Survived", "rawPrediction", "probability", "prediction"]).show()
  37. result.show()
  38. # evaluate model
  39. from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
  40. evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")\
  41. .setLabelCol("Survived")
  42. print(evaluator.evaluate(result))
  43. # 加载已经训练好的XGB模型(分布式的)
  44. from sparkxgb import XGBoostClassifier,XGBoostClassificationModel
  45. model1= XGBoostClassificationModel().load(distrib_model_path)
  46. model1.transform(td).show()

这是运行结果:

3、也可将Xgboost的相关jar包存放到HDFS中加载

百度安全验证https://baijiahao.baidu.com/s?id=1764030284066757768&wfr=spider&for=pc单机训练版本的Xgboost改成分布式训练版本加载。

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Time : 2022/9/12 13:52
  4. from pyspark.sql import SparkSession
  5. from pyspark.ml.feature import VectorAssembler, StringIndexer
  6. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  7. from sparkxgb import XGBoostClassifier, XGBoostClassificationModel
  8. model_path = "hdfs:///user/hive/models/iris_model"
  9. model_native_path = "./iris_native_model"
  10. py_file = "hdfs:///user/hive/python/sparkxgb.zip"
  11. xgboost4j = "hdfs:///user/hive/jar/xgboost4j_2.11-1.0.0.jar"
  12. xgboost4j_spark = "hdfs:///user/hive/jar/xgboost4j-spark_2.11-1.0.0.jar"
  13. jar_files = ",".join([xgboost4j, xgboost4j_spark])
  14. spark = SparkSession.builder \
  15. .appName("spark-xgboost-iris-train") \
  16. .master("yarn") \
  17. .config("spark.jars", jar_files) \
  18. .getOrCreate()
  19. spark.sparkContext.addPyFile(py_file)
  20. # load data and feature process
  21. iris_data = spark.table("tmp_db.iris_data")
  22. iris_data.fillna(0.0)
  23. stringIndex = StringIndexer(inputCols="target", outputCol="label")
  24. vectorAssembler = VectorAssembler(inputCols=["feature_1", "feature_2", "feature_3", "feature_4"],
  25. outputCol="features")
  26. iris_data = stringIndex.fit(iris_data).transform(iris_data)
  27. iris_data = vectorAssembler.transform(iris_data)
  28. train, test = iris_data.randomSplit([0.8, 0.2], seed=24)
  29. # build and train model
  30. xgboost = XGBoostClassifier(
  31. numClass=3,
  32. featuresCol="features",
  33. labelCol="label",
  34. objective="multi:softmax",
  35. missing=0.0
  36. )
  37. model = xgboost.fit(train)
  38. # save distribute and native model
  39. model.write().overwrite().save(model_path)
  40. model.nativeBooster.saveModel(model_native_path)
  41. # model predict
  42. test_result = model.transform(test)
  43. # evaluate model
  44. evaluator = MulticlassClassificationEvaluator(labelCol="label")
  45. evaluator.evaluate(test_result)
  46. test_result.show()
  47. # new a model
  48. model1 = XGBoostClassificationModel().load(model_path)
  49. # model predict
  50. predict_result = model1.transform(test)

Done.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/955400
推荐阅读
相关标签
  

闽ICP备14008679号