赞
踩
使用xgboost模型有三个依赖需要添加或配置:
(1)xgboost4j.jar https://mvnrepository.com/artifact/ml.dmlc/xgboost4j
(2)xgboost4j-spark.jar https://mvnrepository.com/artifact/ml.dmlc/xgboost4j-spark
(3) sparkxgb.zip 这个github还可以,里面也有代码实践例子https://github.com/MachineLP/Spark-/tree/master/pyspark-xgboost
其他参考:去年同月份(2022-08)有一个xgboost的博文,记录了下载最新xgboost.jar的方法,
pyspark使用xgboost做模型训练_sparkxbg包_Just Jump的博客-CSDN博客
还有scala版本,配置pom的方法:Xgboost安装、使用和算法原理理解_xgboost 文档_Just Jump的博客-CSDN博客
这里使用了Pipeline来封装特征处理和模型训练步骤,保存为pipelineModel。
注意这里加载xgboost依赖的jar包和zip包的方法。
- #这是用 pipeline 包装了XGBOOST的例子。 此路通!
-
- import os
- import sys
- import time
- import pandas as pd
- import numpy as np
- import pyspark.sql.types as typ
- import pyspark.ml.feature as ft
- from pyspark.sql.functions import isnan, isnull
-
- from pyspark.sql.types import StructType, StructField
-
- from pyspark.sql.types import *
- from pyspark.ml.feature import StringIndexer, VectorAssembler
- from pyspark.ml import Pipeline
- from pyspark.sql.functions import col
- from pyspark.sql import SparkSession
-
- os.environ['PYSPARK_PYTHON'] = 'Python3.7/bin/python'
- os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'
-
- spark = SparkSession \
- .builder \
- .appName("PySpark XGBOOST Titanic") \
- .config('spark.driver.allowMultipleContexts', 'true') \
- .config('spark.pyspark.python', 'Python3.7/bin/python') \
- .config('spark.yarn.dist.archives', 'hdfs://ns62007/user/dmc_adm/_PYSPARK_ENV/Python3.7.zip#Python3.7') \
- .config('spark.executorEnv.PYSPARK_PYTHON', 'Python3.7/bin/python') \
- .config('spark.sql.autoBroadcastJoinThreshold', '-1') \
- .enableHiveSupport() \
- .getOrCreate()
-
- spark.sparkContext.addPyFile("sparkxgb.zip")
-
- schema = StructType(
- [StructField("PassengerId", DoubleType()),
- StructField("Survived", DoubleType()),
- StructField("Pclass", DoubleType()),
- StructField("Name", StringType()),
- StructField("Sex", StringType()),
- StructField("Age", DoubleType()),
- StructField("SibSp", DoubleType()),
- StructField("Parch", DoubleType()),
- StructField("Ticket", StringType()),
- StructField("Fare", DoubleType()),
- StructField("Cabin", StringType()),
- StructField("Embarked", StringType())
- ])
-
- upload_file = "titanic/train.csv"
- hdfs_path = "hdfs://tmp/gao/dev_data/dmb_upload_data/"
- file_path = os.path.join(hdfs_path, upload_file.split("/")[-1])
-
- df_raw = spark\
- .read\
- .option("header", "true")\
- .schema(schema)\
- .csv(file_path)
-
- df_raw.show(20)
- df = df_raw.na.fill(0)
-
- sexIndexer = StringIndexer()\
- .setInputCol("Sex")\
- .setOutputCol("SexIndex")\
- .setHandleInvalid("keep")
-
- cabinIndexer = StringIndexer()\
- .setInputCol("Cabin")\
- .setOutputCol("CabinIndex")\
- .setHandleInvalid("keep")
-
- embarkedIndexer = StringIndexer()\
- .setInputCol("Embarked")\
- .setHandleInvalid("keep")
-
- # .setOutputCol("EmbarkedIndex")\
-
- vectorAssembler = VectorAssembler()\
- .setInputCols(["Pclass", "Age", "SibSp", "Parch", "Fare"])\
- .setOutputCol("features")
-
-
- from sparkxgb import XGBoostClassifier
- xgboost = XGBoostClassifier(
- maxDepth=3,
- missing=float(0.0),
- featuresCol="features",
- labelCol="Survived"
- )
-
- pipeline = Pipeline(stages=[vectorAssembler, xgboost])
-
-
- trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
- trainDF.show(2)
- model = pipeline.fit(trainDF)
-
- print (88888888888888888888)
- model.transform(testDF).select(col("PassengerId"), col("Survived"), col("prediction")).show()
- print (9999999999999999999)
-
- # Write model/classifier
- model.write().overwrite().save(os.path.join(hdfs_path,"xgboost_class_test"))
-
- from pyspark.ml import PipelineModel
- model1 = PipelineModel.load(os.path.join(hdfs_path,"xgboost_class_test"))
- model1.transform(testDF).show()

这是执行结果:
- # Train a xgboost model
- from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
- from pyspark.ml import Pipeline
- # spark.sparkContext.addPyFile("sparkxgb.zip") # read xgboost pyspark client lib
- from sparkxgb import XGBoostClassifier
-
- assembler = VectorAssembler(
- inputCols=[ 'Pclass',
- 'Age',
- 'SibSp',
- 'Parch',
- 'Fare' ],
- outputCol="features", handleInvalid="skip")
-
- xgboost = XGBoostClassifier(
- maxDepth=3,
- missing=float(0.0),
- featuresCol="features",
- labelCol="Survived")
-
- # pipeline = Pipeline(stages=[assembler, xgboost])
- # trained_model = pipeline.fit(data)
-
- trainDF, testDF = data.randomSplit([0.8, 0.2], seed=24)
- trainDF.show(2)
-
- td = assembler.transform(trainDF)
- td.cache()
- td.show()
- trained_raw_model = xgboost.fit(td)
-
- # save distribute and native model
- distrib_model_path = os.path.join(hdfs_path, 'distribute_xgboost_model')
- native_model_path = "./native_xgboost_model.xgboost"
- # 保存分布式的训练模型
- trained_raw_model.write().overwrite().save(distrib_model_path)
- # 保存模型到本地磁盘 # save trained model to local disk
- trained_raw_model.nativeBooster.saveModel(native_model_path)
- print(trained_raw_model.nativeBooster.gamma)
-
- # model predict
- result = trained_raw_model.transform(assembler.transform(testDF))
- # result.select(["Survived", "rawPrediction", "probability", "prediction"]).show()
- result.show()
-
- # evaluate model
- from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
- evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")\
- .setLabelCol("Survived")
- print(evaluator.evaluate(result))
-
- # 加载已经训练好的XGB模型(分布式的)
- from sparkxgb import XGBoostClassifier,XGBoostClassificationModel
- model1= XGBoostClassificationModel().load(distrib_model_path)
- model1.transform(td).show()

这是运行结果:
百度安全验证https://baijiahao.baidu.com/s?id=1764030284066757768&wfr=spider&for=pc单机训练版本的Xgboost改成分布式训练版本加载。
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2022/9/12 13:52
-
- from pyspark.sql import SparkSession
- from pyspark.ml.feature import VectorAssembler, StringIndexer
- from pyspark.ml.evaluation import MulticlassClassificationEvaluator
- from sparkxgb import XGBoostClassifier, XGBoostClassificationModel
-
- model_path = "hdfs:///user/hive/models/iris_model"
- model_native_path = "./iris_native_model"
- py_file = "hdfs:///user/hive/python/sparkxgb.zip"
- xgboost4j = "hdfs:///user/hive/jar/xgboost4j_2.11-1.0.0.jar"
- xgboost4j_spark = "hdfs:///user/hive/jar/xgboost4j-spark_2.11-1.0.0.jar"
- jar_files = ",".join([xgboost4j, xgboost4j_spark])
-
- spark = SparkSession.builder \
- .appName("spark-xgboost-iris-train") \
- .master("yarn") \
- .config("spark.jars", jar_files) \
- .getOrCreate()
-
- spark.sparkContext.addPyFile(py_file)
- # load data and feature process
- iris_data = spark.table("tmp_db.iris_data")
- iris_data.fillna(0.0)
- stringIndex = StringIndexer(inputCols="target", outputCol="label")
- vectorAssembler = VectorAssembler(inputCols=["feature_1", "feature_2", "feature_3", "feature_4"],
- outputCol="features")
- iris_data = stringIndex.fit(iris_data).transform(iris_data)
- iris_data = vectorAssembler.transform(iris_data)
-
- train, test = iris_data.randomSplit([0.8, 0.2], seed=24)
-
- # build and train model
- xgboost = XGBoostClassifier(
- numClass=3,
- featuresCol="features",
- labelCol="label",
- objective="multi:softmax",
- missing=0.0
- )
- model = xgboost.fit(train)
- # save distribute and native model
- model.write().overwrite().save(model_path)
- model.nativeBooster.saveModel(model_native_path)
- # model predict
- test_result = model.transform(test)
- # evaluate model
- evaluator = MulticlassClassificationEvaluator(labelCol="label")
- evaluator.evaluate(test_result)
- test_result.show()
-
- # new a model
- model1 = XGBoostClassificationModel().load(model_path)
- # model predict
- predict_result = model1.transform(test)

Done.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。