对连续值处理
0.binarizer/二值化
- from __future__ import print_function
- from pyspark.sql import SparkSession
- from pyspark.ml.feature import Binarizer#ml相对于mllib更全一点,更新一点
- spark = SparkSession\
- .builder\
- .appName("BinarizerExample")\
- .getOrCreate()
-
- continuousDataFrame = spark.createDataFrame([
- (0, 1.1),
- (1, 8.5),
- (2, 5.2)
- ], ["id", "feature"])
-
- binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")
-
- binarizedDataFrame = binarizer.transform(continuousDataFrame)
-
- print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
- binarizedDataFrame.show()
-
- spark.stop()
结果:
Binarizer output with Threshold = 5.100000 +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 1.1| 0.0| | 1| 8.5| 1.0| | 2| 5.2| 1.0| +---+-------+-----------------+
1.按照给定边界离散化
- from __future__ import print_function
- from pyspark.sql import SparkSession
- from pyspark.ml.feature import Bucketizer
-
- spark = SparkSession\
- .builder\
- .appName("BucketizerExample")\
- .getOrCreate()
-
- splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]#-float("inf"):指的是负无穷
-
- data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
- dataFrame = spark.createDataFrame(data, ["features"])
-
- bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
-
- # 按照给定的边界进行分桶
- bucketedData = bucketizer.transform(dataFrame)
-
- print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
- bucketedData.show()
-
- spark.stop()
结果:
Bucketizer output with 4 buckets +--------+----------------+ |features|bucketedFeatures| +--------+----------------+ | -999.9| 0.0| | -0.5| 1.0| | -0.3| 1.0| | 0.0| 2.0| | 0.2| 2.0| | 999.9| 3.0| +--------+----------------+
2.quantile_discretizer/按分位数离散化
- from __future__ import print_function
- from pyspark.ml.feature import QuantileDiscretizer
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("QuantileDiscretizerExample")\
- .getOrCreate()
-
- data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
- df = spark.createDataFrame(data, ["id", "hour"])
- df = df.repartition(1)#数据量小设置为1个分区,这样不出错!数据量大的话可以设置为多个分区。
-
- # 分成3个桶进行离散化
- discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
-
- result = discretizer.fit(df).transform(df)
- result.show()
-
- spark.stop()
结果:
+---+----+------+ | id|hour|result| +---+----+------+ | 0|18.0| 2.0| | 1|19.0| 2.0| | 2| 8.0| 1.0| | 3| 5.0| 0.0| | 4| 2.2| 0.0| | 5| 9.2| 1.0| | 6|14.4| 2.0| +---+----+------+
3.最大最小值幅度缩放
- from __future__ import print_function
- from pyspark.ml.feature import MaxAbsScaler
- from pyspark.ml.linalg import Vectors
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("MaxAbsScalerExample")\
- .getOrCreate()
-
- dataFrame = spark.createDataFrame([
- (0, Vectors.dense([1.0, 0.1, -8.0]),),#dense表示稠密向量
- (1, Vectors.dense([2.0, 1.0, -4.0]),),
- (2, Vectors.dense([4.0, 10.0, 8.0]),)
- ], ["id", "features"])
-
- scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")#最大最小值用于缩放
-
- # 计算最大最小值用于缩放
- scalerModel = scaler.fit(dataFrame)#fit与transform分开写,因为fit的数据还要用于测试集的变换
-
- # 缩放幅度到[-1, 1]之间
- scaledData = scalerModel.transform(dataFrame)
- scaledData.select("features", "scaledFeatures").show()
-
- spark.stop()
结果:
+--------------+----------------+ | features| scaledFeatures| +--------------+----------------+ |[1.0,0.1,-8.0]|[0.25,0.01,-1.0]| |[2.0,1.0,-4.0]| [0.5,0.1,-0.5]| |[4.0,10.0,8.0]| [1.0,1.0,1.0]| +--------------+----------------+
4.标准化
- from __future__ import print_function
- from pyspark.ml.feature import StandardScaler
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("StandardScalerExample")\
- .getOrCreate()
-
- dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")#libsvm数据格式,适用于存储稀疏数据: [label] [index1]:[value1] [index2]:[value2] …
- scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
- withStd=True, withMean=False)
-
- # 计算均值方差等参数
- scalerModel = scaler.fit(dataFrame)
-
- # 标准化
- scaledData = scalerModel.transform(dataFrame)
- scaledData.show()
-
- spark.stop()
+-----+--------------------+--------------------+ |label| features| scaledFeatures| +-----+--------------------+--------------------+ | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| | 1.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[151,152,153...|(692,[151,152,153...| | 0.0|(692,[129,130,131...|(692,[129,130,131...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[99,100,101,...|(692,[99,100,101,...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[153,154,155...|(692,[153,154,155...| | 0.0|(692,[151,152,153...|(692,[151,152,153...| | 1.0|(692,[129,130,131...|(692,[129,130,131...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 1.0|(692,[150,151,152...|(692,[150,151,152...| | 0.0|(692,[124,125,126...|(692,[124,125,126...| | 0.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| +-----+--------------------+--------------------+ only showing top 20 rows
- from __future__ import print_function
- from pyspark.ml.feature import StandardScaler
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("StandardScalerExample")\
- .getOrCreate()
-
- dataFrame = spark.createDataFrame([
- (0, Vectors.dense([1.0, 0.1, -8.0]),),
- (1, Vectors.dense([2.0, 1.0, -4.0]),),
- (2, Vectors.dense([4.0, 10.0, 8.0]),)
- ], ["id", "features"])
-
- # 计算均值方差等参数
- scalerModel = scaler.fit(dataFrame)
-
- # 标准化
- scaledData = scalerModel.transform(dataFrame)
- scaledData.show()
-
- spark.stop()
结果:
+---+--------------+--------------------+ | id| features| scaledFeatures| +---+--------------+--------------------+ | 0|[1.0,0.1,-8.0]|[0.65465367070797...| | 1|[2.0,1.0,-4.0]|[1.30930734141595...| | 2|[4.0,10.0,8.0]|[2.61861468283190...| +---+--------------+--------------------+
5.添加多项式特征
- from __future__ import print_function
- from pyspark.ml.feature import PolynomialExpansion
- from pyspark.ml.linalg import Vectors
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("PolynomialExpansionExample")\
- .getOrCreate()
-
- df = spark.createDataFrame([
- (Vectors.dense([2.0, 1.0]),),
- (Vectors.dense([0.0, 0.0]),),
- (Vectors.dense([3.0, -1.0]),)
- ], ["features"])
-
- polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
- polyDF = polyExpansion.transform(df)
-
- polyDF.show(truncate=False)
-
- spark.stop()
结果:
+----------+------------------------------------------+ |features |polyFeatures | +----------+------------------------------------------+ |[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0] | |[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] | |[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]| +----------+------------------------------------------+
0.独热向量编码
- from __future__ import print_function
- from pyspark.ml.feature import OneHotEncoder, StringIndexer
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("OneHotEncoderExample")\
- .getOrCreate()
-
- df = spark.createDataFrame([
- (0, "a"),
- (1, "b"),
- (2, "c"),
- (3, "a"),
- (4, "a"),
- (5, "c")
- ], ["id", "category"])
-
- stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")#类别编码:出现频次越低,数值越大
- model = stringIndexer.fit(df)
- indexed = model.transform(df)
-
- encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
- encoded = encoder.transform(indexed)
- encoded.show()
-
- spark.stop()
结果:
+---+--------+-------------+-------------+ | id|category|categoryIndex| categoryVec| +---+--------+-------------+-------------+ | 0| a| 0.0|(2,[0],[1.0])| | 1| b| 2.0| (2,[],[])| | 2| c| 1.0|(2,[1],[1.0])| | 3| a| 0.0|(2,[0],[1.0])| | 4| a| 0.0|(2,[0],[1.0])| | 5| c| 1.0|(2,[1],[1.0])| +---+--------+-------------+-------------+
0.去停用词
- from __future__ import print_function
- from pyspark.ml.feature import StopWordsRemover
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("StopWordsRemoverExample")\
- .getOrCreate()
-
- sentenceData = spark.createDataFrame([
- (0, ["I", "saw", "the", "red", "balloon"]),
- (1, ["Mary", "had", "a", "little", "lamb"])
- ], ["id", "raw"])
-
- remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
- remover.transform(sentenceData).show(truncate=False)#truncate=False表示没有做截断,长的话可以试着截断观看结果
-
- spark.stop()
结果:
+---+----------------------------+--------------------+ |id |raw |filtered | +---+----------------------------+--------------------+ |0 |[I, saw, the, red, balloon] |[saw, red, balloon] | |1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]| +---+----------------------------+--------------------+
1.Tokenizer
- from __future__ import print_function
- from pyspark.ml.feature import Tokenizer, RegexTokenizer
- from pyspark.sql.functions import col, udf
- from pyspark.sql.types import IntegerType
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("TokenizerExample")\
- .getOrCreate()
-
- sentenceDataFrame = spark.createDataFrame([
- (0, "Hi I heard about Spark"),
- (1, "I wish Java could use case classes"),
- (2, "Logistic,regression,models,are,neat")
- ], ["id", "sentence"])
-
- tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
-
- regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")#干掉空格部分,保留非空格部分
-
- countTokens = udf(lambda words: len(words), IntegerType())
-
- tokenized = tokenizer.transform(sentenceDataFrame)
- tokenized.select("sentence", "words")\
- .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
-
- regexTokenized = regexTokenizer.transform(sentenceDataFrame)
- regexTokenized.select("sentence", "words") \
- .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
-
- spark.stop()
结果:
+-----------------------------------+------------------------------------------+------+ |sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ |Hi I heard about Spark |[hi, i, heard, about, spark] |5 | |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |1 | +-----------------------------------+------------------------------------------+------+ +-----------------------------------+------------------------------------------+------+ |sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ |Hi I heard about Spark |[hi, i, heard, about, spark] |5 | |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 | +-----------------------------------+------------------------------------------+------+
2.count_vectorizer
- from __future__ import print_function
- from pyspark.sql import SparkSession
- from pyspark.ml.feature import CountVectorizer
-
- spark = SparkSession\
- .builder\
- .appName("CountVectorizerExample")\
- .getOrCreate()
-
- df = spark.createDataFrame([
- (0, "a b c".split(" ")),
- (1, "a b b c a".split(" "))
- ], ["id", "words"])
-
- cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
-
- model = cv.fit(df)
-
- result = model.transform(df)
- result.show(truncate=False)
-
- spark.stop()
结果:
+---+---------------+-------------------------+ |id |words |features | +---+---------------+-------------------------+ |0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])| |1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])| +---+---------------+-------------------------+
3.TF-IDF权重
- from __future__ import print_function
- from pyspark.ml.feature import HashingTF, IDF, Tokenizer
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("TfIdfExample")\
- .getOrCreate()
-
- sentenceData = spark.createDataFrame([
- (0.0, "Hi I heard about Spark"),
- (0.0, "I wish Java could use case classes"),
- (1.0, "Logistic regression models are neat")
- ], ["label", "sentence"])
-
- tokenizer = Tokenizer(inputCol="sentence", outputCol="words")#Tokenizer适合英文分词,spark中的中文分词效果最好的是NLPIR,jieba效果不是最好的
- wordsData = tokenizer.transform(sentenceData)
-
- hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
- featurizedData = hashingTF.transform(wordsData)
-
- idf = IDF(inputCol="rawFeatures", outputCol="features")
- idfModel = idf.fit(featurizedData)
- rescaledData = idfModel.transform(featurizedData)
-
- rescaledData.select("label", "features").show()
-
- spark.stop()
结果:
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(20,[0,5,9,17],[0...| | 0.0|(20,[2,7,9,13,15]...| | 1.0|(20,[4,6,13,15,18...| +-----+--------------------+
4.n-gram语言模型
- from __future__ import print_function
- from pyspark.ml.feature import NGram
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("NGramExample")\
- .getOrCreate()
-
- #Hanmeimei loves LiLei
- #LiLei loves Hanmeimei
-
- wordDataFrame = spark.createDataFrame([
- (0, ["Hi", "I", "heard", "about", "Spark"]),
- (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
- (2, ["Logistic", "regression", "models", "are", "neat"])
- ], ["id", "words"])
-
- ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
-
- ngramDataFrame = ngram.transform(wordDataFrame)
- ngramDataFrame.select("ngrams").show(truncate=False)
-
- spark.stop()
结果:
+------------------------------------------------------------------+ |ngrams | +------------------------------------------------------------------+ |[Hi I, I heard, heard about, about Spark] | |[I wish, wish Java, Java could, could use, use case, case classes]| |[Logistic regression, regression models, models are, are neat] | +------------------------------------------------------------------+
0.SQL变换
- from __future__ import print_function
- from pyspark.ml.feature import SQLTransformer
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("SQLTransformerExample")\
- .getOrCreate()
-
- df = spark.createDataFrame([
- (0, 1.0, 3.0),
- (2, 2.0, 5.0)
- ], ["id", "v1", "v2"])
-
- sqlTrans = SQLTransformer(
- statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
- sqlTrans.transform(df).show()
-
- spark.stop()
结果:
+---+---+---+---+----+ | id| v1| v2| v3| v4| +---+---+---+---+----+ | 0|1.0|3.0|4.0| 3.0| | 2|2.0|5.0|7.0|10.0| +---+---+---+---+----+
1.R公式变换
- from __future__ import print_function
- from pyspark.ml.feature import RFormula
- from pyspark.sql import SparkSession
-
- spark = SparkSession\
- .builder\
- .appName("RFormulaExample")\
- .getOrCreate()
-
- dataset = spark.createDataFrame(
- [(7, "US", 18, 1.0),
- (8, "CA", 12, 0.0),
- (9, "NZ", 15, 0.0)],
- ["id", "country", "hour", "clicked"])
-
- formula = RFormula(
- formula="clicked ~ country + hour",
- featuresCol="features",
- labelCol="label")
-
- output = formula.fit(dataset).transform(dataset)
- output.select("features", "label").show()
-
- spark.stop()
结果:
+--------------+-----+ | features|label| +--------------+-----+ |[0.0,0.0,18.0]| 1.0| |[1.0,0.0,12.0]| 0.0| |[0.0,1.0,15.0]| 0.0| +--------------+-----+