赞
踩
TF(Term frequence):某个词在一个文档中出现的频率。
T
F
=
某
个
词
在
一
个
文
档
中
出
现
的
次
数
这
个
文
档
中
词
的
总
数
TF=\frac{某个词在一个文档中出现的次数}{这个文档中词的总数}
TF=这个文档中词的总数某个词在一个文档中出现的次数
有些停用词几乎在所有的文章中都有出现,故只使用词频来进行计算是不合适的,这个时候就引入了IDF。
IDF(Inverse document frequency):就是说在一个语料库中,有很多个文档,那么文档库中出现某个词的文档数越大,说明这个词汇在更多的文档中出现,所以其重要性相对较低。底为自然对数e
I
D
F
=
l
o
g
语
料
库
中
文
档
的
总
数
(
语
料
库
中
出
现
某
个
词
的
文
档
的
总
数
+
1
)
IDF=log\frac{语料库中文档的总数}{(语料库中出现某个词的文档的总数+1)}
IDF=log(语料库中出现某个词的文档的总数+1)语料库中文档的总数
IDF跟TF相乘,就得到了TF-IDF算法的核心公式。
T
F
−
I
D
F
=
(
T
F
)
词
频
∗
(
I
D
F
)
逆
文
档
频
率
TF-IDF=(TF)词频*(IDF)逆文档频率
TF−IDF=(TF)词频∗(IDF)逆文档频率
该算法的优点:
简单快速,容易理解。
缺点
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer} import org.apache.spark.sql.{DataFrame, SparkSession} /** * tf-idf=tf*idf * tf词频-----一个词在文章中出现的次数/文章中的总词数 * idf逆文档频率------log(文章总数/(包含该词的文档数+1)) * */ object tf_idf_count2 { def main(args: Array[String]): Unit = { // 1. 构建SparkSession的环境 val session: SparkSession = SparkSession.builder().appName("tf_idf_count2").master("local[*]").getOrCreate() // 2. 构建语料库,文章ID,和文章中的语句 val seqdata = Seq( (0, "Hi I hear about spark"), (1, "I wish Java could be case class"), (2, "Logistic Regression model are near") ) // 3. 转换成DataFrame格式的数据 val input: DataFrame = session.createDataFrame(seqdata).toDF("docId","sentence") // input.show(false) // +-----+----------------------------------+ // |docId|sentence | // +-----+----------------------------------+ // |0 |Hi I hear about spark | // |1 |I wish Java could be case class | // |2 |Logistic Regression model are near| // +-----+----------------------------------+ // 4-1. 构建分词器,一般继承transform方法的类实例化的对象都会有setInputCol,setOutputCol方法 val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") // 4-2.通过分词器对句子进行分词 val wordDf: DataFrame = tokenizer.transform(input) wordDf.show(false) // +-----+----------------------------------+----------------------------------------+ // |docId|sentence |words | // +-----+----------------------------------+----------------------------------------+ // |0 |Hi I hear about spark |[hi, i, hear, about, spark] | // |1 |I wish Java could be case class |[i, wish, java, could, be, case, class] | // |2 |Logistic Regression model are near|[logistic, regression, model, are, near]| // +-----+----------------------------------+----------------------------------------+ // 4-3 构建HashingTF,词频统计器 val hashingTf: HashingTF = new HashingTF().setInputCol("words").setOutputCol("tf") // 4-4 通过词频统计器来统计词频 val hashingTfResult: DataFrame = hashingTf.transform(wordDf) hashingTfResult.show(false) // +-----+----------------------------------+----------------------------------------+-------------------------------------------------------------------------------------+ // |docId|sentence |words |tf | // +-----+----------------------------------+----------------------------------------+-------------------------------------------------------------------------------------+ // |0 |Hi I hear about spark |[hi, i, hear, about, spark] |(262144,[24417,49304,91137,111370,234657],[1.0,1.0,1.0,1.0,1.0]) | // |1 |I wish Java could be case class |[i, wish, java, could, be, case, class] |(262144,[19862,20719,24417,55551,147765,167152,192310],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])| // |2 |Logistic Regression model are near|[logistic, regression, model, are, near]|(262144,[13671,92225,150278,167122,190884],[1.0,1.0,1.0,1.0,1.0]) | // +-----+----------------------------------+----------------------------------------+-------------------------------------------------------------------------------------+ // 4-5 配置IDF模型的参数 val idf: IDF = new IDF().setInputCol("tf").setOutputCol("features") // 4-6 训练IDF模型 val idfModel: IDFModel = idf.fit(hashingTfResult) // 4-7 调用transform方法,得到每一个单词对应的TFIDF val tfidf: DataFrame = idfModel.transform(hashingTfResult) tfidf.show(false) // 可以看到第一句和第二句都出现了i,对应的hash值为24417,这个i对应的tfidf的值就比较小,是0.28768207245178085 // ----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ // words |features | // ----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ // [hi, i, hear, about, spark] |(262144,[24417,49304,91137,111370,234657],[0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453]) | // [i, wish, java, could, be, case, class] |(262144,[19862,20719,24417,55551,147765,167152,192310],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])| // [logistic, regression, model, are, near]|(262144,[13671,92225,150278,167122,190884],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453]) | // ----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ } }
CountVectorizer并CountVectorizerModel旨在帮助将一组文本文档转换为标签计数的向量。当apriori字典不可用时,CountVectorizer可以用作Estimator提取词汇表,并生成一个CountVectorizerModel。该模型为词汇表上的文档生成稀疏表示,然后可以将其传递给其他算法,如LDA。
在拟合过程中,CountVectorizer将选择vocabSize按语料库中的术语频率排序的顶部单词。可选参数minDF还通过指定术语必须出现在文档中的最小数量(或<1.0)来影响拟合过程。另一个可选的二进制切换参数控制输出向量。如果设置为true,则所有非零计数都设置为1.这对于模拟二进制而非整数计数的离散概率模型特别有用。
二值化,数值特征的值通过跟阈值比较转换成二进制(0/1)的过程,需要设定一个阈值,>阈值得到的特征值被二进制化为1.0,<=阈值得到的特征值被二进制化为0.0,支持Vector,Double类型的inputCol
object BinarizerTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
val context: SparkContext = spark.sparkContext
//1.data
val data = spark.createDataFrame(Array((0, 0.1), (1, 0.8), (2, 0.5))).toDF("label", "features")
//2.二值化的操作---estimator or transform
// 继承Transformer类型,不需要fit,直接transform就好,estimator,需要fit得到模型对象,然后通过模型对象进行transform
val binarizer: Binarizer = new Binarizer().setInputCol("features").setOutputCol("binarizer").setThreshold(0.5)
val result: DataFrame = binarizer.transform(data)
result.show(false)
}
}
PCA是Principal Component Analysis的缩写,汉语称为主成分分析,主要使用的场景是多维度特征降低维度到低纬度:寻找方差最大化的操作。
import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{PCA, PCAModel} import org.apache.spark.ml.linalg import org.apache.spark.ml.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** *主成分分析法,方差越大说明特征越重要。 */ object PCATest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val sc: SparkContext = spark.sparkContext val vectors: Array[linalg.Vector] = Array( Vectors.sparse(5, Seq((1, 30.0), (2, 40.0), (4, 100.0))), Vectors.dense(10, 234, 44, 33, 55), Vectors.dense(1, 34, 4, 3, 5) ) val rdd: RDD[linalg.Vector] = sc.parallelize(vectors) // 将Array中的成员转化成一个个的一元元组 val df: DataFrame = spark.createDataFrame(rdd.map(Tuple1.apply)).toDF("features") val pca: PCA = new PCA().setInputCol("features").setOutputCol("pca_features").setK(3) val pcaModel: PCAModel = pca.fit(df) val pcaDf: DataFrame = pcaModel.transform(df) pcaDf.show(false) // +-----------------------------+-----------------------------------------------------------+ // |features |pca_features | // +-----------------------------+-----------------------------------------------------------+ // |(5,[1,2,4],[30.0,40.0,100.0])|[-35.060835042756494,-106.13187833133475,0.455442077573025]| // |[10.0,234.0,44.0,33.0,55.0] |[-240.5674422053953,-55.014030054036596,0.4554420775730357]| // |[1.0,34.0,4.0,3.0,5.0] |[-34.36209692083503,-4.414073768726447,0.4554420775730281] | // +-----------------------------+-----------------------------------------------------------+ } }
StringIndexer:将标签列的字符串类型编码为标签索引列,索引从0开始,到标签列中标签种类的个数减去1,标签列中标签出现的越频繁,编码后的索引越小,因此最频繁出现的标签的索引为0。
当我们使用StringIndexer将标签列转换成标签索引列,使用监督算法,训练好了一个模型,进行了预测,想要把预测后的标签列转换成原来标签对应的字符串,这个时候就引入IndexToString了。
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel} import org.apache.spark.sql.{DataFrame, SparkSession} object StringToIndexerToString { def main(args: Array[String]): Unit = { // 1. 构建环境 val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("StringToIndexerToString").getOrCreate() // 2. 构建数据集 val data: Seq[(Int, String)] = Seq((1,"x"),(2,"y"),(3,"z"),(4,"z"),(5,"x"),(6,"z")) // 3. 转化成DataFrame结构的数据 val df: DataFrame = sparkSession.createDataFrame(data).toDF("id","StringLabel") // 4. 创建StringIndexer对象 val stringIndexer: StringIndexer = new StringIndexer().setInputCol("StringLabel").setOutputCol("IndexLabel") // 5. 训练StringIndexerModel模型 val stringIndexerModel: StringIndexerModel = stringIndexer.fit(df) // 6. 通过模型转换原始数据中的标签列的字符串值编码成数字类型的值。 val indexerDf: DataFrame = stringIndexerModel.transform(df) indexerDf.show(false) // +---+-----------+----------+ // |id |StringLabel|IndexLabel| // +---+-----------+----------+ // |1 |x |1.0 | // |2 |y |2.0 | // |3 |z |0.0 | // |4 |z |0.0 | // |5 |x |1.0 | // |6 |z |0.0 | // +---+-----------+----------+ // // 当我们通过上面的转换好的数据,使用监督算法,训练好了一个模型,进行了预测,想要把预测后的标签列转换成原来标签的字符串 // 这个时候就引入了IndexToString了 // 7. 构建IndexToString对象,指定好输入列和输出列 val indexToString: IndexToString = new IndexToString().setInputCol("IndexLabel").setOutputCol("IndexLabelToString") // 8. 进行transform转换上面已经存在"IndexLabel"列的DataFrame类型的数据 val indexToStringDf: DataFrame = indexToString.transform(indexerDf) indexToStringDf.show(false) } }
独热编码, 在英文文献中称做 one-hot code, 直观来说就是有多少个状态就有多少比特,而且只有一个比特为1,其他全为0的一种码制
优点:独热编码解决了分类器不好处理属性数据的问题,在一定程度上也起到了扩充特征的作用。它的值只有0和1,不同的类型存储在垂直的空间。
缺点:当类别的数量很多时,特征空间会变得非常大。在这种情况下,一般可以用PCA来减少维度。而且one hot encoding+PCA这种组合在实际中也非常有用。
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, StringIndexerModel} import org.apache.spark.sql.{DataFrame, SparkSession} /** * 类别1 类别2 类别3 * 0 0 1 * 0 1 0 * 1 0 0 */ object OneHotEncoderTest1 { def main(args: Array[String]): Unit = { // 1. 构建SparkSession环境 val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() // 2. 构建data测试数据集 val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) val df: DataFrame = spark.createDataFrame(data).toDF("id", "categories") // A one-hot encoder that maps a column of category indices to a column of binary vectors // 注意在使用OneHotEncoder()时,输入的列需要是标签索引的类型,所以标签需要进行索引编码,使用StringIndexer。 // 3-1. 构建StringIndexer对象 val stringIndexer: StringIndexer = new StringIndexer().setInputCol("categories").setOutputCol("indies") // 3-2. 训练StringIndexerModel val model: StringIndexerModel = stringIndexer.fit(df) // 3-3. 对标签列编码成索引标签列 val stringLabel: DataFrame = model.transform(df) // 4-1. 创建OneHotEncoder对象 // Spark源码: The last category is not included by default 最后一个种类默认不包含 // 和python scikit-learn's OneHotEncoder不同,scikit-learn's OneHotEncoder包含所有 val ohe: OneHotEncoder = new OneHotEncoder().setInputCol("indies").setOutputCol("ohe") // Whether to drop the last category in the encoded vector (default: true) // todo 注意这个地方默认是true,会将最后一个类别丢弃掉,如果不想丢弃掉,需要设置成false. .setDropLast(false) // 4-2. 对索引标签进行独热编码的转换 val oheDf: DataFrame = ohe.transform(stringLabel) oheDf.show(false) // |id |categories|indies|ohe | // +---+----------+------+-------------+ // |0 |a |0.0 |(3,[0],[1.0])| // |1 |b |2.0 |(3,[2],[1.0])| // |2 |c |1.0 |(3,[1],[1.0])| // |3 |a |0.0 |(3,[0],[1.0])| // |4 |a |0.0 |(3,[0],[1.0])| // |5 |c |1.0 |(3,[1],[1.0])| // +---+----------+------+-------------+ } }
VectorIndexer将向量转化为indexer索引,在决策树方面能够帮助构建模型
多个向量组合成一个表结构,VectorIndexer转化的是以列为转化的角度。
import org.apache.spark.ml.feature.{VectorIndexer, VectorIndexerModel} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.{DataFrame, SparkSession} /** * VectorIndexer将向量转化为indexer索引,在决策树方面能够帮助构建模型 * 多个向量组合成一个表结构,VectorIndexer转化的是以列为转化的角度。 * 比如下面的最后一列的值为10,11,12,100,6,那么对应的索引为:1.0,2.0,3.0,4.0,0.0 * 真实值跟索引是升序的关系,即真实值最小的那个值的索引为0 * setMaxCategories,设置最多转化多少列为indexer索引。 */ object VectorIndexerTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val data=Seq( Vectors.dense(-1,1,1,8,56,10), Vectors.dense(-1,3,-1,-9,88,11), Vectors.dense(0,5,1,10,96,12), Vectors.dense(0,5,1,11,589,100), Vectors.dense(0,5,1,11,688,6) ) val df: DataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") //1-定义操作 val indexr: VectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("features_indexer").setMaxCategories(5) //2-训练estemator val model: VectorIndexerModel = indexr.fit(df) val set1: Set[Int] = model.categoryMaps.keys.toSet println(set1.mkString(","))//0,5,1,2,3,4 //3-展示和转化 model.transform(df).show(false) // +------------------------------+-------------------------+ // |features |features_indexer | // +------------------------------+-------------------------+ // |[-1.0,1.0,1.0,8.0,56.0,10.0] |[1.0,0.0,1.0,1.0,0.0,1.0]| // |[-1.0,3.0,-1.0,-9.0,88.0,11.0]|[1.0,1.0,0.0,0.0,1.0,2.0]| // |[0.0,5.0,1.0,10.0,96.0,12.0] |[0.0,2.0,1.0,2.0,2.0,3.0]| // |[0.0,5.0,1.0,11.0,589.0,100.0]|[0.0,2.0,1.0,3.0,3.0,4.0]| // |[0.0,5.0,1.0,11.0,688.0,6.0] |[0.0,2.0,1.0,3.0,4.0,0.0]| // +------------------------------+-------------------------+ } }
import org.apache.spark.ml.feature.Normalizer import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.{DataFrame, SparkSession} /** * * 正则罚项是L1和L2正则罚项 * 范数的含义是距离的函数 --- 闵可夫斯基距离 * L1范数 L1正则罚项------稀疏化特征 * L2范数 L2正则罚项------凸函数 */ object NormL1L2Test1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val dataframe = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 1.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") //使用L1正则罚项 val norm: Normalizer = new Normalizer().setInputCol("features").setOutputCol("normfeatures").setP(1.0) val l1Norm: DataFrame = norm.transform(dataframe) l1Norm.show(truncate = false) // todo L1正则罚项如何计算的呢?比如[1.0,1.5,-1.0]怎么得到的=>[0.2857142857142857,0.42857142857142855,-0.2857142857142857] // todo 累加向量中每一项的解绝对值,每一项在除以上一步的累加和,|1.0|+|1.5|+|-1.0|=3.5=>[1/3.5,1.5/3.5,-1.0/3.5]=>[0.2857142857142857,0.42857142857142855,-0.2857142857142857] // |id |features |normfeatures | // +---+--------------+------------------------------------------------------------+ // |0 |[1.0,1.5,-1.0]|[0.2857142857142857,0.42857142857142855,-0.2857142857142857]| // |1 |[2.0,1.0,1.0] |[0.5,0.25,0.25] | // |2 |[4.0,10.0,2.0]|[0.25,0.625,0.125] | // +---+--------------+------------------------------------------------------------+ //使用L2正则罚项 val l2Norm: DataFrame = norm.transform(dataframe, norm.p -> 2) l2Norm.show(truncate = false) // todo L2正则罚项如何求解的?[1.0,1.5,-1.0]|[0.48507125007266594,0.7276068751089989,-0.48507125007266594] // todo 向量中每一项平方后求和,然后开方 sqrt{1.0*1.0+1.5*1.5+(-1.0)*(-1.0)}=2.061552812808830274910704927987 // todo 向量中每一项除以上一项的开方后的结果[1.0/2.0615528128,1.5/2.0615528128,(-1.0)/2.0615528128] // +---+--------------+-------------------------------------------------------------+ // |id |features |normfeatures | // +---+--------------+-------------------------------------------------------------+ // |0 |[1.0,1.5,-1.0]|[0.48507125007266594,0.7276068751089989,-0.48507125007266594]| // |1 |[2.0,1.0,1.0] |[0.8164965809277261,0.4082482904638631,0.4082482904638631] | // |2 |[4.0,10.0,2.0]|[0.3651483716701107,0.9128709291752769,0.18257418583505536] | } }
import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession /** * Standscaler:标准化操作--通过减去均值除以方差来得到转化后的数据--转化为0均值1方差的数据,按列进行计算的。 * minmax:数据-最小值/最大值-最小值---将数据缩放到【0,1】,按列进行计算的。 * maxabs:将数据缩放到[-1,1]之间,按列进行计算的。 */ object StandScaler_MinMax_MaxAbsTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("WARN") val rawdata = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") // val path1 = "E:\\ml\\workspace\\SparkMllibBase\\sparkmllib_part1\\sparkmllib_base\\data\\sample_libsvm_data.txt" // val rawdata: DataFrame = spark.read.format("libsvm").load(path1) //标准化的处理数据---数值型数据 // StandardScaler,有两个参数比较重要,一个是setWithMean,每一列的每一个元素是否需要减去当前列的平均值,默认是false. // setWithStd,每一列中的每一个数据是否需要除以当前列的样本标准差,(注意是样本标准差而不是标准差,主要是分母除以n-1哦),默认是true. val stanscaler: StandardScaler = new StandardScaler().setInputCol("features").setOutputCol("StandardScaler").setWithMean(true).setWithStd(true) val model_sta: StandardScalerModel = stanscaler.fit(rawdata) model_sta.transform(rawdata).select("StandardScaler").show(false) // 最大值最小值处理的处理0-1,当前列的每一个数据 - 当前列的最小值 / (当前列的最大值 - 最小值) val minmax: MinMaxScaler =new MinMaxScaler().setInputCol("features").setOutputCol("MinMaxScaler") val model_sta1: MinMaxScalerModel = minmax.fit(rawdata) model_sta1.transform(rawdata).select("MinMaxScaler").show(false) //最大值绝对值的处理: -1到1之间 当前列中每一个数据 / 当前列中数据的绝对值的最大值 val maxabs: MaxAbsScaler = new MaxAbsScaler().setInputCol("features").setOutputCol("MaxAbsScaler") val model_sta2: MaxAbsScalerModel = maxabs.fit(rawdata) model_sta2.transform(rawdata).select("MaxAbsScaler").show(false) } }
Bucketizer其实是Binarizer的升级版本,Binarizer是将连续值离散化成两个值(1.0/0.0),升级成离散化成多个值。
import org.apache.spark.ml.feature.Bucketizer import org.apache.spark.sql.{DataFrame, SparkSession} /** * Bucketizer---分箱或分桶的操作---将连续值属性离散化 */ object BuckizerTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() //准备分箱的区间 val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) //准备数据 val dataframe = spark.createDataFrame(Array(-10, -0.5, -0.3, 0.0, 0.2).map(Tuple1.apply)).toDF("features") //分箱操作 val buckizer: Bucketizer = new Bucketizer().setInputCol("features").setOutputCol("bucketfeatures").setSplits(splits) val backetResultData: DataFrame = buckizer.transform(dataframe) backetResultData.show() // +--------+--------------+ // |features|bucketfeatures| // +--------+--------------+ // | -10.0| 0.0| // | -0.5| 1.0| // | -0.3| 1.0| // | 0.0| 2.0| // | 0.2| 2.0| } }
将多个分散数据整合成一个数据,分散的数据需要是数值类型的数据-整合成的一个数据可以用户训练模型。
import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.{DataFrame, SparkSession} /** * VectorAssemble将若干分散的数据整合为一个完成数据--可以用于训练模型 * id | hour | mobile | userFeatures | clicked | features * ----|------|--------|------------------|---------|-------------------------- * 0 | 110 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5] */ object VectorAssembleTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val data=Seq( (0,110,1.0,Vectors.dense(0.0,10.0,0.5),1.0) ) val df: DataFrame = spark.createDataFrame(data).toDF("id","hour","mobile","userfeatures","click") //1-定义操作 val resultdata=new VectorAssembler().setInputCols(Array("hour","mobile","userfeatures")).setOutputCol("features").transform(df) resultdata.show(false) // +---+----+------+--------------+-----+-----------------------+ // |id |hour|mobile|userfeatures |click|features | // +---+----+------+--------------+-----+-----------------------+ // |0 |110 |1.0 |[0.0,10.0,0.5]|1.0 |[110.0,1.0,0.0,10.0,0.5]| // +---+----+------+--------------+-----+-----------------------+ println(resultdata.select("features","click").first())//| [18.0, 1.0, 0.0, 10.0, 0.5] } }
QuantileDiscretizer采用具有连续特征的列,并输出具有分箱分类特征的列。箱数由numBuckets参数设定。使用近似算法选择bin范围(有关详细说明,请参阅aboutQuantile的文档 )。
import org.apache.spark.ml.feature.QuantileDiscretizer import org.apache.spark.sql.{DataFrame, SparkSession} /** * QuantileDiscretizer将连续值的属性离散化的操作 * 将整个连续值的通过最大值-最小值平均分成几份。最小的那一组对应的结果为0.0 * 划分的范围是左闭右开的,下面的例子中连续值区间是0-18 * 分位数的概念进行离散化 * +---+----+------+ * | id|hour|result| * +---+----+------+ * | 0|17.0| 2.0| * | 1|18.0| 2.0| * | 2| 8.0| 1.0| * | 3| 5.0| 1.0| * | 4| 2.2| 0.0| * +---+----+------+ */ object QuantileTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val data=Array((0,17.0),(1,18.0),(2,8.0),(3,5.0),(4,2.2)) val df: DataFrame = spark.createDataFrame(data).toDF("id","hour") //连续值属性离散化的操作 val discriter: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("result").setNumBuckets(3) val result: DataFrame = discriter.fit(df).transform(df) result.show() } }
通过特征列的索引和特征列的名字来选择特征
import java.util import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute} import org.apache.spark.ml.feature.VectorSlicer import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * VectorSlicer---根据下标或者特征列的名字选择特征 * userFeatures | features * ------------------|----------------------------- * [0.0, 10.0, 0.5] | [10.0, 0.5] * ["f1", "f2", "f3"] | ["f2", "f3"] */ object VectorSlicerTest2 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("WARN") // 1. 构造一个Vector val data = util.Arrays.asList(Row(Vectors.dense(-2.0, 2.3, 0.0))) // 2. 构建一个Array val attrs: Array[NumericAttribute] = Array("f1", "f2", "f3").map(NumericAttribute.defaultAttr.withName) for (elem <- attrs) {println(elem)} // {"type":"numeric","name":"f1"} // {"type":"numeric","name":"f2"} // {"type":"numeric","name":"f3"} // 3. 构建一个组,一个组中还分成3个属性值,属性的个数个向量的维数相同 val attGroup: AttributeGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]]) println(attGroup) // {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"f1"},{"idx":1,"name":"f2"},{"idx":2,"name":"f3"}]},"num_attrs":3}} // 4-构造df,一个df中的一个列有一个列名,这个列名下还可以分成各个属性值 val df = spark.createDataFrame(data, StructType(Array(attGroup.toStructField()))) //5-1-构造分割器 val slicer: VectorSlicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features") //5-2设定如何选择特征列 slicer.setIndices(Array(0)).setNames(Array("f2")) //5-3直接得到结果 val result: DataFrame = slicer.transform(df) result.show(false) // +--------------+---------+ // | userFeatures| features| // +--------------+---------+ // |[-2.0,2.3,0.0]|[2.3,0.0]| // +--------------+---------+ } }
通过回归方程来选择特征
RFormula选择由R模型公式指定的列。目前,我们支持R运算符的有限子集,包括’〜’,’。’,’:’,’+‘和’ - '。基本的运营商是:
~
单独的目标和条款
+
concat术语,“+ 0”表示删除拦截
-
删除一个术语,“ - 1”表示删除拦截
:
交互(数值乘法或二进制分类值)
.
除目标之外的所有列
假设a并且b是双列,我们使用以下简单示例来说明以下效果RFormula:
RFormula生成一个特征向量列和一个标签的双列或字符串列。就像在R中使用公式进行线性回归一样,字符串输入列将是**独热编码**的,而数字列将被转换为双精度。如果label列的类型为string,则首先将其转换为double StringIndexer。如果DataFrame中不存在标签列,则将从公式中的指定响应变量创建输出标签列。
package sparkmllib_base.features_select_05 import org.apache.spark.ml.feature.RFormula import org.apache.spark.sql.{DataFrame, SparkSession} /** * R公式---y=kx+b结构选额特征 * id | country | hour | clicked | features | label * ---|---------|------|---------|------------------|------- * 7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0 * 8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0 * 9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0 */ object RFormulaTest1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("SparkMlilb") .master("local[1]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") val dataset = spark.createDataFrame(Seq( (7, "US", 18, 1.0), (8, "CA", 12, 0.0), (9, "NZ", 15, 0.0) )).toDF("id", "country", "hour", "clicked") val formula: RFormula = new RFormula().setFormula("clicked ~ country + hour") .setFeaturesCol("features").setLabelCol("clicked") val result: DataFrame = formula.fit(dataset).transform(dataset) result.select("features", "clicked").show() // +--------------+-------+ // | features|clicked| // +--------------+-------+ // |[0.0,0.0,18.0]| 1.0| // |[1.0,0.0,12.0]| 0.0| // |[0.0,1.0,15.0]| 0.0| // +--------------+-------+ } }
根据假设检验的方法来选择特征-选择相关性较高的特征
import org.apache.spark.ml.feature.ChiSqSelector import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过假设检验的方法--卡方验证的原理 * 假设其中一列数据和类别标签列的数据相关的,通过卡方验证得到的结果结果是比较大的,说明偏差较大,拒绝原假设 * 通过卡方验证选择和类别标签列相关的TopN的特征 * id | features | clicked | selectedFeatures * ---|----------------------|--------|------------------ * 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0] * 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0] * 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1] */ object ChiAquareTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("SparkMlilb") .master("local[2]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val data = Seq( (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0), (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0), (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0) ) val df = spark.createDataset(data).toDF("id", "features", "clicked") df.show() val selector: ChiSqSelector = new ChiSqSelector().setNumTopFeatures(2).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectFeatures") val result: DataFrame = selector.fit(df).transform(df) result.show() // +---+------------------+-------+--------------+ // | id| features|clicked|selectFeatures| // +---+------------------+-------+--------------+ // | 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0,1.0]| // | 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0,0.0]| // | 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0,0.1]| // +---+------------------+-------+--------------+ } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。