当前位置:   article > 正文

Spark机器学习基础-特征工程

spark |(780,[127,128,129...|

对连续值处理

0.binarizer/二值化

  1. from __future__ import print_function
  2. from pyspark.sql import SparkSession
  3. from pyspark.ml.feature import Binarizer#ml相对于mllib更全一点,更新一点

  

  1. spark = SparkSession\
  2. .builder\
  3. .appName("BinarizerExample")\
  4. .getOrCreate()
  5. continuousDataFrame = spark.createDataFrame([
  6. (0, 1.1),
  7. (1, 8.5),
  8. (2, 5.2)
  9. ], ["id", "feature"])
  10. binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")
  11. binarizedDataFrame = binarizer.transform(continuousDataFrame)
  12. print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
  13. binarizedDataFrame.show()
  14. spark.stop()

  结果:

Binarizer output with Threshold = 5.100000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    1.1|              0.0|
|  1|    8.5|              1.0|
|  2|    5.2|              1.0|
+---+-------+-----------------+

1.按照给定边界离散化

  1. from __future__ import print_function
  2. from pyspark.sql import SparkSession
  3. from pyspark.ml.feature import Bucketizer
  4. spark = SparkSession\
  5. .builder\
  6. .appName("BucketizerExample")\
  7. .getOrCreate()
  8. splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]#-float("inf"):指的是负无穷
  9. data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
  10. dataFrame = spark.createDataFrame(data, ["features"])
  11. bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
  12. # 按照给定的边界进行分桶
  13. bucketedData = bucketizer.transform(dataFrame)
  14. print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
  15. bucketedData.show()
  16. spark.stop()

  结果:

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+

2.quantile_discretizer/按分位数离散化

  1. from __future__ import print_function
  2. from pyspark.ml.feature import QuantileDiscretizer
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("QuantileDiscretizerExample")\
  7. .getOrCreate()
  8. data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
  9. df = spark.createDataFrame(data, ["id", "hour"])
  10. df = df.repartition(1)#数据量小设置为1个分区,这样不出错!数据量大的话可以设置为多个分区。
  11. # 分成3个桶进行离散化
  12. discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
  13. result = discretizer.fit(df).transform(df)
  14. result.show()
  15. spark.stop()

  结果:

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   0.0|
|  4| 2.2|   0.0|
|  5| 9.2|   1.0|
|  6|14.4|   2.0|
+---+----+------+

3.最大最小值幅度缩放

  1. from __future__ import print_function
  2. from pyspark.ml.feature import MaxAbsScaler
  3. from pyspark.ml.linalg import Vectors
  4. from pyspark.sql import SparkSession
  5. spark = SparkSession\
  6. .builder\
  7. .appName("MaxAbsScalerExample")\
  8. .getOrCreate()
  9. dataFrame = spark.createDataFrame([
  10. (0, Vectors.dense([1.0, 0.1, -8.0]),),#dense表示稠密向量
  11. (1, Vectors.dense([2.0, 1.0, -4.0]),),
  12. (2, Vectors.dense([4.0, 10.0, 8.0]),)
  13. ], ["id", "features"])
  14. scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")#最大最小值用于缩放
  15. # 计算最大最小值用于缩放
  16. scalerModel = scaler.fit(dataFrame)#fit与transform分开写,因为fit的数据还要用于测试集的变换
  17. # 缩放幅度到[-1, 1]之间
  18. scaledData = scalerModel.transform(dataFrame)
  19. scaledData.select("features", "scaledFeatures").show()
  20. spark.stop()

 结果:

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+

4.标准化

  1. from __future__ import print_function
  2. from pyspark.ml.feature import StandardScaler
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("StandardScalerExample")\
  7. .getOrCreate()
  8. dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")#libsvm数据格式,适用于存储稀疏数据: [label] [index1]:[value1] [index2]:[value2] …
  9. scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
  10. withStd=True, withMean=False)
  11. # 计算均值方差等参数
  12. scalerModel = scaler.fit(dataFrame)
  13. # 标准化
  14. scaledData = scalerModel.transform(dataFrame)
  15. scaledData.show()
  16. spark.stop()
+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
+-----+--------------------+--------------------+
only showing top 20 rows

 

  1. from __future__ import print_function
  2. from pyspark.ml.feature import StandardScaler
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("StandardScalerExample")\
  7. .getOrCreate()
  8. dataFrame = spark.createDataFrame([
  9. (0, Vectors.dense([1.0, 0.1, -8.0]),),
  10. (1, Vectors.dense([2.0, 1.0, -4.0]),),
  11. (2, Vectors.dense([4.0, 10.0, 8.0]),)
  12. ], ["id", "features"])
  13. # 计算均值方差等参数
  14. scalerModel = scaler.fit(dataFrame)
  15. # 标准化
  16. scaledData = scalerModel.transform(dataFrame)
  17. scaledData.show()
  18. spark.stop()

  结果:

+---+--------------+--------------------+
| id|      features|      scaledFeatures|
+---+--------------+--------------------+
|  0|[1.0,0.1,-8.0]|[0.65465367070797...|
|  1|[2.0,1.0,-4.0]|[1.30930734141595...|
|  2|[4.0,10.0,8.0]|[2.61861468283190...|
+---+--------------+--------------------+ 

5.添加多项式特征

  1. from __future__ import print_function
  2. from pyspark.ml.feature import PolynomialExpansion
  3. from pyspark.ml.linalg import Vectors
  4. from pyspark.sql import SparkSession
  5. spark = SparkSession\
  6. .builder\
  7. .appName("PolynomialExpansionExample")\
  8. .getOrCreate()
  9. df = spark.createDataFrame([
  10. (Vectors.dense([2.0, 1.0]),),
  11. (Vectors.dense([0.0, 0.0]),),
  12. (Vectors.dense([3.0, -1.0]),)
  13. ], ["features"])
  14. polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
  15. polyDF = polyExpansion.transform(df)
  16. polyDF.show(truncate=False)
  17. spark.stop()

  结果:

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+

对离散型处理

0.独热向量编码

  1. from __future__ import print_function
  2. from pyspark.ml.feature import OneHotEncoder, StringIndexer
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("OneHotEncoderExample")\
  7. .getOrCreate()
  8. df = spark.createDataFrame([
  9. (0, "a"),
  10. (1, "b"),
  11. (2, "c"),
  12. (3, "a"),
  13. (4, "a"),
  14. (5, "c")
  15. ], ["id", "category"])
  16. stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")#类别编码:出现频次越低,数值越大
  17. model = stringIndexer.fit(df)
  18. indexed = model.transform(df)
  19. encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
  20. encoded = encoder.transform(indexed)
  21. encoded.show()
  22. spark.stop()

  结果:

+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|
|  1|       b|          2.0|    (2,[],[])|
|  2|       c|          1.0|(2,[1],[1.0])|
|  3|       a|          0.0|(2,[0],[1.0])|
|  4|       a|          0.0|(2,[0],[1.0])|
|  5|       c|          1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+

 

对文本型处理

0.去停用词

  1. from __future__ import print_function
  2. from pyspark.ml.feature import StopWordsRemover
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("StopWordsRemoverExample")\
  7. .getOrCreate()
  8. sentenceData = spark.createDataFrame([
  9. (0, ["I", "saw", "the", "red", "balloon"]),
  10. (1, ["Mary", "had", "a", "little", "lamb"])
  11. ], ["id", "raw"])
  12. remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
  13. remover.transform(sentenceData).show(truncate=False)#truncate=False表示没有做截断,长的话可以试着截断观看结果
  14. spark.stop()

  结果:

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+

1.Tokenizer

  1. from __future__ import print_function
  2. from pyspark.ml.feature import Tokenizer, RegexTokenizer
  3. from pyspark.sql.functions import col, udf
  4. from pyspark.sql.types import IntegerType
  5. from pyspark.sql import SparkSession
  6. spark = SparkSession\
  7. .builder\
  8. .appName("TokenizerExample")\
  9. .getOrCreate()
  10. sentenceDataFrame = spark.createDataFrame([
  11. (0, "Hi I heard about Spark"),
  12. (1, "I wish Java could use case classes"),
  13. (2, "Logistic,regression,models,are,neat")
  14. ], ["id", "sentence"])
  15. tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
  16. regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")#干掉空格部分,保留非空格部分
  17. countTokens = udf(lambda words: len(words), IntegerType())
  18. tokenized = tokenizer.transform(sentenceDataFrame)
  19. tokenized.select("sentence", "words")\
  20. .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
  21. regexTokenized = regexTokenizer.transform(sentenceDataFrame)
  22. regexTokenized.select("sentence", "words") \
  23. .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
  24. spark.stop()

  结果:

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+

2.count_vectorizer

  1. from __future__ import print_function
  2. from pyspark.sql import SparkSession
  3. from pyspark.ml.feature import CountVectorizer
  4. spark = SparkSession\
  5. .builder\
  6. .appName("CountVectorizerExample")\
  7. .getOrCreate()
  8. df = spark.createDataFrame([
  9. (0, "a b c".split(" ")),
  10. (1, "a b b c a".split(" "))
  11. ], ["id", "words"])
  12. cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
  13. model = cv.fit(df)
  14. result = model.transform(df)
  15. result.show(truncate=False)
  16. spark.stop()

  结果:

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+

3.TF-IDF权重

  1. from __future__ import print_function
  2. from pyspark.ml.feature import HashingTF, IDF, Tokenizer
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("TfIdfExample")\
  7. .getOrCreate()
  8. sentenceData = spark.createDataFrame([
  9. (0.0, "Hi I heard about Spark"),
  10. (0.0, "I wish Java could use case classes"),
  11. (1.0, "Logistic regression models are neat")
  12. ], ["label", "sentence"])
  13. tokenizer = Tokenizer(inputCol="sentence", outputCol="words")#Tokenizer适合英文分词,spark中的中文分词效果最好的是NLPIR,jieba效果不是最好的
  14. wordsData = tokenizer.transform(sentenceData)
  15. hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
  16. featurizedData = hashingTF.transform(wordsData)
  17. idf = IDF(inputCol="rawFeatures", outputCol="features")
  18. idfModel = idf.fit(featurizedData)
  19. rescaledData = idfModel.transform(featurizedData)
  20. rescaledData.select("label", "features").show()
  21. spark.stop()

  结果:

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[0,5,9,17],[0...|
|  0.0|(20,[2,7,9,13,15]...|
|  1.0|(20,[4,6,13,15,18...|
+-----+--------------------+

4.n-gram语言模型

  1. from __future__ import print_function
  2. from pyspark.ml.feature import NGram
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("NGramExample")\
  7. .getOrCreate()
  8. #Hanmeimei loves LiLei
  9. #LiLei loves Hanmeimei
  10. wordDataFrame = spark.createDataFrame([
  11. (0, ["Hi", "I", "heard", "about", "Spark"]),
  12. (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
  13. (2, ["Logistic", "regression", "models", "are", "neat"])
  14. ], ["id", "words"])
  15. ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
  16. ngramDataFrame = ngram.transform(wordDataFrame)
  17. ngramDataFrame.select("ngrams").show(truncate=False)
  18. spark.stop()

  结果:

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+

高级变换

0.SQL变换

  1. from __future__ import print_function
  2. from pyspark.ml.feature import SQLTransformer
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("SQLTransformerExample")\
  7. .getOrCreate()
  8. df = spark.createDataFrame([
  9. (0, 1.0, 3.0),
  10. (2, 2.0, 5.0)
  11. ], ["id", "v1", "v2"])
  12. sqlTrans = SQLTransformer(
  13. statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
  14. sqlTrans.transform(df).show()
  15. spark.stop()

  结果:

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+

1.R公式变换

  1. from __future__ import print_function
  2. from pyspark.ml.feature import RFormula
  3. from pyspark.sql import SparkSession
  4. spark = SparkSession\
  5. .builder\
  6. .appName("RFormulaExample")\
  7. .getOrCreate()
  8. dataset = spark.createDataFrame(
  9. [(7, "US", 18, 1.0),
  10. (8, "CA", 12, 0.0),
  11. (9, "NZ", 15, 0.0)],
  12. ["id", "country", "hour", "clicked"])
  13. formula = RFormula(
  14. formula="clicked ~ country + hour",
  15. featuresCol="features",
  16. labelCol="label")
  17. output = formula.fit(dataset).transform(dataset)
  18. output.select("features", "label").show()
  19. spark.stop()

  结果:

+--------------+-----+
|      features|label|
+--------------+-----+
|[0.0,0.0,18.0]|  1.0|
|[1.0,0.0,12.0]|  0.0|
|[0.0,1.0,15.0]|  0.0|
+--------------+-----+

 



转载于:https://www.cnblogs.com/tianqizhi/p/11567783.html

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号