当前位置:   article > 正文

Pipeline详解及Spark MLlib使用示例(Scala/Java/Python)_pipeline的建立和使用spark





       1.数据框:机器学习接口使用来自Spark SQL的数据框形式数据作为数据集,它可以处理多种数据类型。比如,一个数据框可以有不同的列存储文本、特征向量、标签值和预测值。






       机器学习算法可以应用于多种类型的数据,如向量、文本、图像和结构化数据。管道接口中采用来自Spark SQL的数据框来支持多种类型的数据。可以查看Spark SQLdatatype reference来了解数据框支持的基础和结构化数据类型。除了Spark SQL指南中提到的数据类型外,数据框还可以使用机器学习向量类型。可以显式地建立数据框或者隐式地从规则的RDD建立数据框,下面的代码将会给出示例。数据框中的列需要命名。代码中的示例使用如“text”,“features“和”label“的名字。

































      参数属于指定估计器和转换器实体过程。因此,如果我们有两个逻辑回归实体lr1和lr2,我们可以建立一个ParamMap来指定两个实体的最大迭代次数参数:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。这在一个管道里有两个算法都有最大迭代次数参数时非常有用。







import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
  .select("features", "label", "myProbability", "prediction")
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import org.apache.spark.ml.classification.LogisticRegression;
  4. import org.apache.spark.ml.classification.LogisticRegressionModel;
  5. import org.apache.spark.ml.linalg.VectorUDT;
  6. import org.apache.spark.ml.linalg.Vectors;
  7. import org.apache.spark.ml.param.ParamMap;
  8. import org.apache.spark.sql.Dataset;
  9. import org.apache.spark.sql.Row;
  10. import org.apache.spark.sql.RowFactory;
  11. import org.apache.spark.sql.types.DataTypes;
  12. import org.apache.spark.sql.types.Metadata;
  13. import org.apache.spark.sql.types.StructField;
  14. import org.apache.spark.sql.types.StructType;
  15. // Prepare training data.
  16. List<Row> dataTraining = Arrays.asList(
  17. RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  18. RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  19. RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  20. RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
  21. );
  22. StructType schema = new StructType(new StructField[]{
  23. new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  24. new StructField("features", new VectorUDT(), false, Metadata.empty())
  25. });
  26. Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
  27. // Create a LogisticRegression instance. This instance is an Estimator.
  28. LogisticRegression lr = new LogisticRegression();
  29. // Print out the parameters, documentation, and any default values.
  30. System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
  31. // We may set parameters using setter methods.
  32. lr.setMaxIter(10).setRegParam(0.01);
  33. // Learn a LogisticRegression model. This uses the parameters stored in lr.
  34. LogisticRegressionModel model1 = lr.fit(training);
  35. // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
  36. // we can view the parameters it used during fit().
  37. // This prints the parameter (name: value) pairs, where names are unique IDs for this
  38. // LogisticRegression instance.
  39. System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
  40. // We may alternatively specify parameters using a ParamMap.
  41. ParamMap paramMap = new ParamMap()
  42. .put(lr.maxIter().w(20)) // Specify 1 Param.
  43. .put(lr.maxIter(), 30) // This overwrites the original maxIter.
  44. .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
  45. // One can also combine ParamMaps.
  46. ParamMap paramMap2 = new ParamMap()
  47. .put(lr.probabilityCol().w("myProbability")); // Change output column name
  48. ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
  49. // Now learn a new model using the paramMapCombined parameters.
  50. // paramMapCombined overrides all parameters set earlier via lr.set* methods.
  51. LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
  52. System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
  53. // Prepare test documents.
  54. List<Row> dataTest = Arrays.asList(
  55. RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  56. RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
  57. RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
  58. );
  59. Dataset<Row> test = spark.createDataFrame(dataTest, schema);
  60. // Make predictions on test documents using the Transformer.transform() method.
  61. // LogisticRegression.transform will only use the 'features' column.
  62. // Note that model2.transform() outputs a 'myProbability' column instead of the usual
  63. // 'probability' column since we renamed the lr.probabilityCol parameter previously.
  64. Dataset<Row> results = model2.transform(test);
  65. Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
  66. for (Row r: rows.collectAsList()) {
  67. System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
  68. + ", prediction=" + r.get(3));
  69. }

  1. from pyspark.ml.linalg import Vectors
  2. from pyspark.ml.classification import LogisticRegression
  3. # Prepare training data from a list of (label, features) tuples.
  4. training = spark.createDataFrame([
  5. (1.0, Vectors.dense([0.0, 1.1, 0.1])),
  6. (0.0, Vectors.dense([2.0, 1.0, -1.0])),
  7. (0.0, Vectors.dense([2.0, 1.3, 1.0])),
  8. (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
  9. # Create a LogisticRegression instance. This instance is an Estimator.
  10. lr = LogisticRegression(maxIter=10, regParam=0.01)
  11. # Print out the parameters, documentation, and any default values.
  12. print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
  13. # Learn a LogisticRegression model. This uses the parameters stored in lr.
  14. model1 = lr.fit(training)
  15. # Since model1 is a Model (i.e., a transformer produced by an Estimator),
  16. # we can view the parameters it used during fit().
  17. # This prints the parameter (name: value) pairs, where names are unique IDs for this
  18. # LogisticRegression instance.
  19. print "Model 1 was fit using parameters: "
  20. print model1.extractParamMap()
  21. # We may alternatively specify parameters using a Python dictionary as a paramMap
  22. paramMap = {lr.maxIter: 20}
  23. paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
  24. paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
  25. # You can combine paramMaps, which are python dictionaries.
  26. paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
  27. paramMapCombined = paramMap.copy()
  28. paramMapCombined.update(paramMap2)
  29. # Now learn a new model using the paramMapCombined parameters.
  30. # paramMapCombined overrides all parameters set earlier via lr.set* methods.
  31. model2 = lr.fit(training, paramMapCombined)
  32. print "Model 2 was fit using parameters: "
  33. print model2.extractParamMap()
  34. # Prepare test data
  35. test = spark.createDataFrame([
  36. (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
  37. (0.0, Vectors.dense([3.0, 2.0, -0.1])),
  38. (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
  39. # Make predictions on test data using the Transformer.transform() method.
  40. # LogisticRegression.transform will only use the 'features' column.
  41. # Note that model2.transform() outputs a "myProbability" column instead of the usual
  42. # 'probability' column since we renamed the lr.probabilityCol parameter previously.
  43. prediction = model2.transform(test)
  44. selected = prediction.select("features", "label", "myProbability", "prediction")
  45. for row in selected.collect():
  46. print row



import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
val hashingTF = new HashingTF()
val lr = new LogisticRegression()
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk

// We can also save this unfit pipeline to disk

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
  .select("id", "text", "probability", "prediction")
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")

  1. import java.util.Arrays;
  2. import org.apache.spark.ml.Pipeline;
  3. import org.apache.spark.ml.PipelineModel;
  4. import org.apache.spark.ml.PipelineStage;
  5. import org.apache.spark.ml.classification.LogisticRegression;
  6. import org.apache.spark.ml.feature.HashingTF;
  7. import org.apache.spark.ml.feature.Tokenizer;
  8. import org.apache.spark.sql.Dataset;
  9. import org.apache.spark.sql.Row;
  10. // Prepare training documents, which are labeled.
  11. Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  12. new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  13. new JavaLabeledDocument(1L, "b d", 0.0),
  14. new JavaLabeledDocument(2L, "spark f g h", 1.0),
  15. new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
  16. ), JavaLabeledDocument.class);
  17. // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
  18. Tokenizer tokenizer = new Tokenizer()
  19. .setInputCol("text")
  20. .setOutputCol("words");
  21. HashingTF hashingTF = new HashingTF()
  22. .setNumFeatures(1000)
  23. .setInputCol(tokenizer.getOutputCol())
  24. .setOutputCol("features");
  25. LogisticRegression lr = new LogisticRegression()
  26. .setMaxIter(10)
  27. .setRegParam(0.01);
  28. Pipeline pipeline = new Pipeline()
  29. .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
  30. // Fit the pipeline to training documents.
  31. PipelineModel model = pipeline.fit(training);
  32. // Prepare test documents, which are unlabeled.
  33. Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  34. new JavaDocument(4L, "spark i j k"),
  35. new JavaDocument(5L, "l m n"),
  36. new JavaDocument(6L, "mapreduce spark"),
  37. new JavaDocument(7L, "apache hadoop")
  38. ), JavaDocument.class);
  39. // Make predictions on test documents.
  40. Dataset<Row> predictions = model.transform(test);
  41. for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  42. System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
  43. + ", prediction=" + r.get(3));
  44. }

  1. from pyspark.ml import Pipeline
  2. from pyspark.ml.classification import LogisticRegression
  3. from pyspark.ml.feature import HashingTF, Tokenizer
  4. # Prepare training documents from a list of (id, text, label) tuples.
  5. training = spark.createDataFrame([
  6. (0, "a b c d e spark", 1.0),
  7. (1, "b d", 0.0),
  8. (2, "spark f g h", 1.0),
  9. (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
  10. # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
  11. tokenizer = Tokenizer(inputCol="text", outputCol="words")
  12. hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
  13. lr = LogisticRegression(maxIter=10, regParam=0.01)
  14. pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
  15. # Fit the pipeline to training documents.
  16. model = pipeline.fit(training)
  17. # Prepare test documents, which are unlabeled (id, text) tuples.
  18. test = spark.createDataFrame([
  19. (4, "spark i j k"),
  20. (5, "l m n"),
  21. (6, "mapreduce spark"),
  22. (7, "apache hadoop")], ["id", "text"])
  23. # Make predictions on test documents and print columns of interest.
  24. prediction = model.transform(test)
  25. selected = prediction.select("id", "text", "prediction")
  26. for row in selected.collect():
  27. print(row)

