赞
踩
如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!
Apache Spark MLlib(Machine Learning library)是一个开源机器学习框架,建立在Apache Spark之上,支持分布式计算和大规模数据处理。它提供了许多经典机器学习算法和工具,如分类、回归、聚类、协同过滤、特征提取和数据预处理等。
Spark MLlib使用基于DataFrame的API,提供了一个易于使用的高级API,使得用户能够快速构建、训练和调整机器学习模型,而无需担心底层分布式计算的复杂性。它还支持分布式模型选择和调整,以及与其他Apache Spark组件的集成,如Spark SQL、Spark Streaming和GraphX。
Spark MLlib还提供了Python、Java和Scala等多种编程语言的API,使得不同的开发人员可以使用他们最喜欢的编程语言来开发机器学习应用程序。
总之,Spark MLlib是一个非常强大和灵活的机器学习框架,适用于处理大规模数据和需要分布式计算的场景。
总之,基础RDD更加灵活和可控,但需要开发人员自己掌握计算的方式和顺序;而DataFrame则更加易于使用和高效,适合快速开发和迭代。选择使用哪种数据结构,取决于具体的场景和需求。
在使用Spark MLlib进行机器学习之前,需要对原始数据进行预处理和准备。以下是一些常见的数据准备和预处理步骤:
在Spark MLlib中,可以使用各种预处理和数据准备工具,如:
总之,在使用Spark MLlib进行机器学习之前,需要对原始数据进行预处理和准备。Spark MLlib提供了许多工具和功能,可以帮助我们轻松地完成这些任务。
在Spark MLlib中,有许多常用的特征提取和转换工具,包括:
以上这些工具都可以用于特征提取和转换,帮助我们将原始数据转换为模型可以处理的格式。我们可以根据具体的任务和数据类型选择适当的工具,以获得更好的结果。值得注意的是,这些工具的使用通常需要进行适当的参数设置和调整,以达到最佳的效果。
逻辑回归是一种二元分类模型,它的目标是根据已知数据对一个事物进行分类。逻辑回归的输出是一个概率值,代表该事物属于某个类别的概率。如果概率值大于阈值,则将其分类为正类,否则分类为负类。
在 Spark MLlib 中,可以使用 LogisticRegression 类来实现逻辑回归。下面是一个 Java 版本的示例代码:
pom引用:
- <dependencies>
- <!-- Spark core dependencies -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_2.12</artifactId>
- <version>3.2.0</version>
- </dependency>
-
- <!-- Spark testing dependencies (optional) -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.12</artifactId>
- <version>3.2.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
- <version>3.2.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
- <version>3.2.0</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- import org.apache.spark.ml.classification.LogisticRegression;
- import org.apache.spark.ml.classification.LogisticRegressionModel;
- import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.ml.linalg.Vector;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
- public class LogisticRegressionDemo {
-
- public static void main(String[] args) {
- SparkSession spark = SparkSession
- .builder()
- .appName("LogisticRegressionDemo")
- .master("local[*]")
- .getOrCreate();
-
- // 加载数据
- Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
-
- // 将特征向量转换成一列
- VectorAssembler assembler = new VectorAssembler()
- .setInputCols(new String[]{"features"})
- .setOutputCol("feature");
-
- Dataset<Row> newData = assembler.transform(data).select("label", "feature");
-
- // 将数据集分为训练集和测试集
- Dataset<Row>[] splits = newData.randomSplit(new double[]{0.7, 0.3});
- Dataset<Row> trainData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // 创建逻辑回归模型
- LogisticRegression lr = new LogisticRegression();
-
- // 训练模型
- LogisticRegressionModel lrModel = lr.fit(trainData);
-
- // 在测试集上进行预测
- Dataset<Row> predictions = lrModel.transform(testData);
-
- // 计算模型评估指标
- BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator();
- double auc = evaluator.evaluate(predictions);
-
- System.out.println("Area under ROC curve = " + auc);
-
- spark.stop();
- }
- }
这个示例代码首先加载了一个 libsvm 格式的数据集,然后将特征向量转换成一列,将数据集分为训练集和测试集,创建逻辑回归模型并训练模型,最后在测试集上进行预测并计算模型评估指标。在这个例子中,我们使用了 BinaryClassificationEvaluator 来计算模型的 AUC 指标,它是评估二元分类器性能的一种常用指标。
需要注意的是,以上代码仅供参考,实际情况可能需要根据数据集的特点和任务的要求进行相应的修改。
Spark MLlib 分类决策树是一种基于树结构的分类算法,通过一系列特征对数据进行划分和分类。该算法在 Spark MLlib 中的实现采用 CART(Classification And Regression Tree)算法,使用信息熵或 Gini 系数等指标进行特征选择和划分。Spark MLlib 分类决策树可用于二分类、多分类和概率预测问题。
- import org.apache.spark.ml.Pipeline;
- import org.apache.spark.ml.PipelineModel;
- import org.apache.spark.ml.PipelineStage;
- import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
- import org.apache.spark.ml.classification.DecisionTreeClassifier;
- import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
- import org.apache.spark.ml.feature.IndexToString;
- import org.apache.spark.ml.feature.StringIndexer;
- import org.apache.spark.ml.feature.StringIndexerModel;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
- public class DecisionTreeClassificationExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("DecisionTreeClassificationExample")
- .master("local[*]")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read().format("csv")
- .option("header", "true")
- .option("inferSchema", "true")
- .load("path/to/data.csv");
-
- // 将标签列转换为数值类型
- StringIndexerModel labelIndexer = new StringIndexer()
- .setInputCol("label")
- .setOutputCol("indexedLabel")
- .fit(data);
- data = labelIndexer.transform(data);
-
- // 将特征列转换为特征向量
- VectorAssembler featureAssembler = new VectorAssembler()
- .setInputCols(new String[]{"feature1", "feature2", "feature3"})
- .setOutputCol("features");
- data = featureAssembler.transform(data);
-
- // 将数据集分为训练集和测试集
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3}, 12345);
- Dataset<Row> trainData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // 创建决策树分类器
- DecisionTreeClassifier dt = new DecisionTreeClassifier()
- .setLabelCol("indexedLabel")
- .setFeaturesCol("features");
-
- // 将标签数值转换回原始标签
- IndexToString labelConverter = new IndexToString()
- .setInputCol("prediction")
- .setOutputCol("predictedLabel")
- .setLabels(labelIndexer.labels());
-
- // 创建管道并拟合模型
- Pipeline pipeline = new Pipeline()
- .setStages(new PipelineStage[]{labelIndexer, featureAssembler, dt, labelConverter});
- PipelineModel model = pipeline.fit(trainData);
-
- // 在测试集上进行预测和评估
- Dataset<Row> predictions = model.transform(testData);
- MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
- .setLabelCol("indexedLabel")
- .setPredictionCol("prediction")
- .setMetricName("accuracy");
- double accuracy = evaluator.evaluate(predictions);
- System.out.println("Test Error = " + (1.0 - accuracy));
- // 输出决策树结构
- DecisionTreeClassificationModel treeModel =
- (DecisionTreeClassificationModel) (model.stages()[2]);
- System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
-
- spark.stop();
- }
- }
以上示例中,我们首先使用 SparkSession 读取 CSV 格式的数据集。然后,使用 StringIndexer 将标签列转换为数值类型,并使用 VectorAssembler 将特征列转换为特征向量。接着,将数据集分为训练集和测试集,并创建 DecisionTreeClassifier 决策树分类器。最后,将管道中的各个阶段组合在一起,拟合模型并在测试集上进行预测和评估。
随机森林是一种集成学习算法,它将多棵决策树组合起来,通过投票或平均来决定分类结果。该算法在 Spark MLlib 中的实现使用基于 CART(Classification And Regression Tree)算法的决策树作为基分类器,可以用于二分类、多分类和概率预测问题。
以下是一个基于 Java 的 Spark MLlib 分类随机森林示例:
- import org.apache.spark.ml.Pipeline;
- import org.apache.spark.ml.PipelineModel;
- import org.apache.spark.ml.PipelineStage;
- import org.apache.spark.ml.classification.RandomForestClassificationModel;
- import org.apache.spark.ml.classification.RandomForestClassifier;
- import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
- import org.apache.spark.ml.feature.IndexToString;
- import org.apache.spark.ml.feature.StringIndexer;
- import org.apache.spark.ml.feature.StringIndexerModel;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
- public class RandomForestClassificationExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("RandomForestClassificationExample")
- .master("local[*]")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read().format("csv")
- .option("header", "true")
- .option("inferSchema", "true")
- .load("path/to/data.csv");
-
- // 将标签列转换为数值类型
- StringIndexerModel labelIndexer = new StringIndexer()
- .setInputCol("label")
- .setOutputCol("indexedLabel")
- .fit(data);
- data = labelIndexer.transform(data);
-
- // 将特征列转换为特征向量
- VectorAssembler featureAssembler = new VectorAssembler()
- .setInputCols(new String[]{"feature1", "feature2", "feature3"})
- .setOutputCol("features");
- data = featureAssembler.transform(data);
-
- // 将数据集分为训练集和测试集
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3}, 12345);
- Dataset<Row> trainData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // 创建随机森林分类器
- RandomForestClassifier rf = new RandomForestClassifier()
- .setLabelCol("indexedLabel")
- .setFeaturesCol("features")
- .setNumTrees(10);
-
- // 将标签数值转换回原始标签
- IndexToString labelConverter = new IndexToString()
- .setInputCol("prediction")
- .setOutputCol("predictedLabel")
- .setLabels(labelIndexer.labels());
-
- // 创建管道并拟合模型
- Pipeline pipeline = new Pipeline()
- .setStages(new PipelineStage[]{labelIndexer, featureAssembler, rf, labelConverter});
- PipelineModel model = pipeline.fit(trainData);
-
- // 在测试集上进行预测和评估
- Dataset<Row> predictions = model.transform(testData);
- MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
- .setLabelCol("indexedLabel")
- .setPredictionCol("prediction")
- .setMetricName("accuracy");
- double accuracy = evaluator.evaluate(predictions);
- System.out.println("Test Error = " + (1.0 - accuracy));
- // 获取训练好的随机森林模型并打印树的重要性
- RandomForestClassificationModel rfModel = (RandomForestClassificationModel) model.stages()[2];
- System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
-
- spark.stop();
- }
- }
该示例代码首先使用 SparkSession 读取 CSV 格式的数据集。接下来,使用 StringIndexer 将标签列转换为数值类型,并使用 VectorAssembler 将特征列转换为特征向量。然后,将数据集分为训练集和测试集。创建 RandomForestClassifier,并将其作为管道的一部分进行拟合。拟合后,使用 MulticlassClassificationEvaluator 对测试集进行预测和评估。最后,获取训练好的随机森林模型并打印树的重要性。
请注意,上面的示例中,数据集的路径应该被替换为实际数据集的路径,特征列的名称也应该被替换为实际特征列的名称。
Spark MLlib 提供了一个强大的算法——分类梯度提升树(Gradient-Boosted Trees, GBT),它可以用于二元分类和多类分类。GBT 是一种集成学习算法,它通过在先前树的残差上逐步拟合一系列决策树来提高模型的准确性。
在 Spark MLlib 中,可以使用 GBTClassifier
类来构建分类 GBT 模型。GBT 分类器使用一系列决策树来逐步提高模型的准确性,每个决策树都是在之前决策树的残差上训练得到的。通过这种方式,GBT 可以在更少的迭代次数下得到比随机森林更准确的模型。
与其他 Spark MLlib 分类器类似,GBT 分类器也使用管道(Pipeline)来处理数据。管道通常包括以下几个步骤:
在使用 GBT 分类器时,你需要指定以下参数:
featuresCol
:特征列的名称。labelCol
:标签列的名称。maxIter
:训练迭代次数。maxDepth
:决策树的最大深度。minInstancesPerNode
:每个节点上的最小实例数。stepSize
:每个迭代步骤的步长。subsamplingRate
:用于训练每棵树的数据子样本的比例。- import org.apache.spark.ml.Pipeline;
- import org.apache.spark.ml.PipelineModel;
- import org.apache.spark.ml.PipelineStage;
- import org.apache.spark.ml.classification.GBTClassificationModel;
- import org.apache.spark.ml.classification.GBTClassifier;
- import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
- import org.apache.spark.ml.feature.IndexToString;
- import org.apache.spark.ml.feature.StringIndexer;
- import org.apache.spark.ml.feature.StringIndexerModel;
- import org.apache.spark.ml.feature.VectorIndexer;
- import org.apache.spark.ml.feature.VectorIndexerModel;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
-
- public class GBTExample {
- public static void main(String[] args) {
- // 创建一个 SparkSession
- SparkSession spark = SparkSession
- .builder()
- .appName("GBTExample")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read()
- .format("libsvm")
- .load("data/mllib/sample_libsvm_data.txt");
-
- // 对标签列进行索引
- StringIndexerModel labelIndexer = new StringIndexer()
- .setInputCol("label")
- .setOutputCol("indexedLabel")
- .fit(data);
-
- // 对特征列进行索引
- VectorIndexerModel featureIndexer = new VectorIndexer()
- .setInputCol("features")
- .setOutputCol("indexedFeatures")
- .setMaxCategories(4) // 特征具有少于 4 个不同的值
- .fit(data);
-
- // 将数据集拆分为训练集和测试集
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
- Dataset<Row> trainingData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // 定义 GBT 分类器
- GBTClassifier gbt = new GBTClassifier()
- .setLabelCol("indexedLabel")
- .setFeaturesCol("indexedFeatures")
- .setMaxIter(10)
- .setFeatureSubsetStrategy("auto");
-
- // 将索引的标签转换回原始标签
- IndexToString labelConverter = new IndexToString()
- .setInputCol("prediction")
- .setOutputCol("predictedLabel")
- .setLabels(labelIndexer.labels());
-
- // 创建管道
- Pipeline pipeline = new Pipeline()
- .setStages(new PipelineStage[]{
- labelIndexer,
- featureIndexer,
- gbt,
- labelConverter
- });
-
- // 训练模型
- PipelineModel model = pipeline.fit(trainingData);
-
- // 进行预测
- Dataset<Row> predictions = model.transform(testData);
-
- // 选择样例行显示
- predictions.select("predictedLabel", "label", "features").show(5);
-
- // 评估模型
- MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
- .setLabelCol("indexedLabel")
- .setPredictionCol("prediction")
- .setMetricName("accuracy");
- double accuracy = evaluator.evaluate(predictions);
- System.out.println("Test Error = " + (1.0 - accuracy));
-
- // 获取训练得到的 GBT 模型
- GBTClassificationModel gbtModel = (GBTClassificationModel) (model.stages()[2]);
- System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
-
- spark.stop();
- }
- }
该示例使用了 Spark MLlib 内置的 sample_libsvm_data.txt
数据集。首先,将数据集加载到 DataFrame
中。接下来,对标签列和特征列进行索引。然后,将数据集拆分为训练集和测试集。接下来,创建 GBT 分类器,并使用管道将标签转换回原始标签。最后,使用训练数据拟合管道并进行预测。最终评估模型并输出模型学习到的 GBT 分类模型的调试字符串。该字符串显示了树的结构和分裂标准,以及在每个节点处对特征的使用情况和分裂点。
Spark MLlib 的线性回归算法是一种广泛使用的预测模型。它将输入特征映射到连续的输出值。这是通过训练模型来确定最佳拟合线性函数的系数完成的。
以下是一个使用 Spark MLlib 实现线性回归的 Java 示例程序。
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.ml.regression.LinearRegression;
- import org.apache.spark.ml.regression.LinearRegressionModel;
- import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
- public class LinearRegressionExample {
-
- public static void main(String[] args) {
- // 创建一个 SparkSession
- SparkSession spark = SparkSession
- .builder()
- .appName("LinearRegressionExample")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read()
- .format("libsvm")
- .load("data/mllib/sample_linear_regression_data.txt");
-
- // 将数据集拆分为训练集和测试集
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
- Dataset<Row> trainingData = splits[0];
-
- // 将特征列合并到一个向量列中
- VectorAssembler assembler = new VectorAssembler()
- .setInputCols(new String[]{"features"})
- .setOutputCol("featuresVector");
-
- // 定义线性回归模型
- LinearRegression lr = new LinearRegression()
- .setMaxIter(10)
- .setRegParam(0.3)
- .setElasticNetParam(0.8);
-
- // // 将数据集拟合到线性回归模型中
- Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
-
- //训练模型
- LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
- //打印线性回归的系数和截距
- System.out.println("系数Coefficients: "+lrModel.coefficients() + "");
- System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
- //总结训练集上的模型并打印出一些指标。
- LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
- Dataset<Row> dataset = trainingSummary.predictions().select("prediction", "label", "featuresVector");
- dataset.show(5);
- spark.stop();
- }
- }
Spark MLlib 提供了决策树回归(Decision Tree Regression)算法来解决回归问题。决策树回归是一种基于树结构的非参数统计方法,能够处理多维输入和输出,并且具有良好的可解释性和鲁棒性。
下面是一个简单的 Java 代码示例,演示如何使用 Spark MLlib 的决策树回归算法对数据进行训练和预测:
- import org.apache.spark.ml.Pipeline;
- import org.apache.spark.ml.PipelineModel;
- import org.apache.spark.ml.PipelineStage;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
- import org.apache.spark.ml.regression.DecisionTreeRegressor;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class DecisionTreeRegressionExample {
- public static void main(String[] args) {
- // 创建 SparkSession
- SparkSession spark = SparkSession.builder()
- .appName("DecisionTreeRegressionExample")
- .master("local[*]")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read().format("csv")
- .option("header", "true")
- .option("inferSchema", "true")
- .load("path/to/your/data.csv");
-
- // 定义特征列和标签列
- String[] featureCols = data.columns();
- String labelCol = "label";
-
- // 将特征列转换为向量
- VectorAssembler assembler = new VectorAssembler()
- .setInputCols(featureCols)
- .setOutputCol("features");
-
- Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);
-
- // 将数据集拆分为训练集和测试集
- double[] weights = {0.7, 0.3};
- Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
- Dataset<Row> trainData = datasets[0];
- Dataset<Row> testData = datasets[1];
-
- // 创建决策树回归器
- DecisionTreeRegressor dt = new DecisionTreeRegressor()
- .setLabelCol(labelCol)
- .setFeaturesCol("features");
-
- // 创建 Pipeline
- Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });
-
- // 训练模型
- PipelineModel model = pipeline.fit(trainData);
-
- // 预测测试集
- Dataset<Row> predictions = model.transform(testData);
-
- // 显示预测结果
- predictions.show();
-
- // 获取训练好的决策树模型
- DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
- System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());
-
- // 停止 SparkSession
- spark.stop();
- }
- }
这只是一个简单的示例,实际应用中可能需要更复杂的特征工程和模型调整。另外,如果数据集过大,可能需要在集群上运行以获得更好的性能。
Spark MLlib提供了随机森林回归算法,可以用于预测连续的数值型数据。随机森林是一种集成学习算法,它基于决策树,通过随机选择样本和特征来减少过拟合的风险。随机森林回归使用多个决策树对数据进行拟合和预测,并取这些决策树的平均值作为最终预测结果。本文将介绍如何使用Spark MLlib中的随机森林回归算法,并提供一个完整可运行的Java示例。
示例说明: 在这个示例中,我们将使用Spark MLlib中的随机森林回归算法,对一组汽车数据进行建模,然后使用模型来预测汽车的燃油效率(MPG)。我们将使用UCI Machine Learning Repository中的Auto MPG数据集。该数据集包含8个输入特征,如汽车的气缸数、排量、马力、重量等,以及一个输出特征MPG,表示汽车的燃油效率。我们将使用70%的数据来训练模型,30%的数据用于测试模型性能。
1.准备数据 我们需要下载Auto MPG数据集,将其保存为CSV文件,并将其加载到Spark DataFrame中。
数据集下载地址:https://archive.ics.uci.edu/ml/datasets/auto+mpg
CSV文件格式如下:
- mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
- 18.0,8,307.0,130.0,3504.0,12.0,70,1
- 15.0,8,350.0,165.0,3693.0,11.5,70,1
其中,第一列为输出特征MPG,后面的列为输入特征。
2.构建随机森林回归模型
- // 创建随机森林回归模型
- RandomForestRegressor rf = new RandomForestRegressor()
- .setLabelCol("label")
- .setFeaturesCol("features")
- .setNumTrees(10);
-
- // 训练模型
- RandomForestRegressionModel model = rf.fit(trainingData);
3.使用模型进行预测
- // 使用模型进行预测
- Dataset<Row> predictions = model.transform(testData);
4.评估模型性能
- // 评估模型性能
- RegressionEvaluator evaluator = new RegressionEvaluator()
- .setLabelCol("label")
- .setPredictionCol("prediction")
- .setMetricName("rmse");
- double rmse = evaluator.evaluate(predictions);
下面是完整 Java 代码示例:
- import org.apache.spark.SparkConf;
- import org.apache.spark.ml.Pipeline;
- import org.apache.spark.ml.evaluation.RegressionEvaluator;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.ml.regression.RandomForestRegressor;
- import org.apache.spark.ml.tuning.CrossValidator;
- import org.apache.spark.ml.tuning.CrossValidatorModel;
- import org.apache.spark.ml.tuning.ParamGridBuilder;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class RandomForestRegressionExample {
- public static void main(String[] args) {
-
- // Create a Spark session
- SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
- SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
-
- // Load data
- Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
-
- // Split the data into training and test sets
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
- Dataset<Row> trainingData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // Define the feature column names
- String[] featureCols = new String[data.schema().fieldNames().length - 1];
- for (int i = 0; i < featureCols.length; i++) {
- featureCols[i] = "feature" + (i + 1);
- }
-
- // Assemble features into a vector
- VectorAssembler assembler = new VectorAssembler()
- .setInputCols(featureCols)
- .setOutputCol("features");
-
- Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
- Dataset<Row> testDataWithFeatures = assembler.transform(testData);
-
- // Create a RandomForestRegressor model
- RandomForestRegressor rf = new RandomForestRegressor()
- .setLabelCol("label")
- .setFeaturesCol("features")
- .setMaxDepth(5)
- .setNumTrees(20);
-
- // Set up a pipeline
- Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});
-
- // Set up a grid of hyperparameters to search over using 3-fold cross validation
- ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
- .addGrid(rf.maxDepth(), new int[]{5, 10})
- .addGrid(rf.numTrees(), new int[]{20, 50});
- CrossValidator crossValidator = new CrossValidator()
- .setEstimator(pipeline)
- .setEvaluator(new RegressionEvaluator())
- .setEstimatorParamMaps(paramGridBuilder.build())
- .setNumFolds(3);
-
- // Train the model using cross-validation
- crossValidator.setSeed(12345);
- CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);
-
- // Evaluate the model on the test set
- Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
- RegressionEvaluator evaluator = new RegressionEvaluator()
- .setLabelCol("label")
- .setPredictionCol("prediction")
- .setMetricName("rmse");
- double rmse = evaluator.evaluate(predictions);
- System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
-
- // Stop the Spark session
- spark.stop();
- }
- }
Spark MLlib提供了梯度提升回归树(Gradient-Boosted Trees,GBT)的算法,它是一种强大的回归模型,可以用于连续的数值预测。GBT在每一次迭代中,使用决策树模型去拟合残差值,然后将所有的模型的预测结果相加,得到最终的预测结果。
以下是一个使用Spark MLlib进行梯度提升回归树的示例Java程序。这个程序将使用一个数据集,该数据集包含了关于自行车租赁量的信息。它将使用梯度提升回归树来预测一天中自行车的租赁量。
- import org.apache.spark.ml.evaluation.RegressionEvaluator;
- import org.apache.spark.ml.feature.VectorAssembler;
- import org.apache.spark.ml.regression.GBTRegressionModel;
- import org.apache.spark.ml.regression.GBTRegressor;
- import org.apache.spark.sql.*;
-
- public class GBTRegressionDemo {
- public static void main(String[] args) {
-
- // 创建 SparkSession
- SparkSession spark = SparkSession.builder()
- .appName("GradientBoostedTreeRegressionDemo")
- .master("local[*]")
- .getOrCreate();
-
- // 读取数据集
- Dataset<Row> data = spark.read().format("libsvm")
- .load("data/sample_libsvm_data.txt");
-
- // 将数据集划分为训练集和测试集
- Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
- Dataset<Row> trainingData = splits[0];
- Dataset<Row> testData = splits[1];
-
- // 将特征向量合并为一个向量
- VectorAssembler assembler = new VectorAssembler()
- .setInputCols(trainingData.columns())
- .setOutputCol("features");
- Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
- Dataset<Row> testDataWithFeatures = assembler.transform(testData);
-
- // 创建梯度提升回归树模型
- GBTRegressor gbt = new GBTRegressor()
- .setLabelCol("label")
- .setFeaturesCol("features")
- .setMaxIter(10);
-
- // 训练模型
- GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);
-
- // 在测试集上进行预测
- Dataset<Row> predictions = model.transform(testDataWithFeatures);
-
- // 评估模型
- RegressionEvaluator evaluator = new RegressionEvaluator()
- .setLabelCol("label")
- .setPredictionCol("prediction")
- .setMetricName("rmse");
- double rmse = evaluator.evaluate(predictions);
- System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
-
- // 输出模型的节点信息
- System.out.println("Learned regression GBT model:\n" + model.toDebugString());
-
- // 关闭 SparkSession
- spark.close();
- }
- }
该程序与之前的程序类似,只是将 DecisionTreeRegressor
类替换为 GBTRegressor
类。需要注意的是,GBTRegressor
类在设置参数时需要设置 maxIter
参数,表示最大迭代次数。同样需要用 setFeaturesCol
方法设置特征列,用 setLabelCol
方法设置标签列。最后需要调用 fit
方法训练模型,然后用 transform
方法在测试集上进行预测。最后用 RegressionEvaluator
类评估模型,并输出模型的节点信息。
关于无监督学习,模型评估和调优,模型部署的内容请看下篇《Java应用|使用Apache Spark MLlib构建机器学习模型【下】》。
如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。