当前位置:   article > 正文

Spark数据处理与特征工程_spark bucketizer

spark bucketizer

sklearn在中小型数据集上,在工业界是在使用的
xgboost、lightgbm在工业界的实用度非常的高
工业界数据体量真的达到上亿或者十亿这样的规模用sklern处理起来是比较吃力的,
可借助于大数据的工具,比如spark来解决
现在可以用spark来做大数据上的数据处理,比如数据工程、监督学习、无监督学习模型的构建,只要计算资源够就OK。【大数据底层做分布式处理】
注意:spark基于RDD形态、DataFrame形态两种形态的工具库,其中基于RDD形态的工具库目前已经暂停维护,所以建议使用DataFrame形态

  1. 对连续值处理
    binaizer/二值化、按照给定边界离散化、 quantile_discretizer/按分位数、最大最小值幅度缩放、标准化、添加多项式特征
  2. 对离散型处理
    独热向量编码
  3. 对文本型处理
    去停用词、Tokenizer、count_vectorizer、TF-IDF权重、n-gram语言模型
  4. 高级变化
    sql变换、R公式变换

对连续值处理

有的变换器需要fit在transfrom,有的不需要
直接transfrom通常不需要去扫描数据的,比如二值化,只需要设置阈值即可

1.1、binarizer/二值化

#连续值处理
##二值化
from __future__ import  print_function
from pyspark.sql import SparkSession
from  pyspark.ml.feature import Binarizer

spark = SparkSession\
        .builder\
        .appName("BinarizerExample")\
        .getOrCreate()
#用spark创建DataFrame
continuousDataFrame = spark.createDataFrame([
    (0,1.1),
    (1,8.5),
    (2,5.2)
],['id','feature'])
#切分器threshold以5.1为划分点
binarizer = Binarizer(threshold=5.1,inputCol="feature",outputCol="binarized_feature")
#transform进行二值化
binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1.2、 按照给定边界离散化

#按照给点给的边界离散化
#比如用户的年龄可划分为几段,一些年龄便是边界
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer

spark = SparkSession\
        .builder\
        .appName("BucketizerExample")\
        .getOrCreate()
#分桶的边界
splits = [-float('inf'),-0.5,0.0,0.5,float('inf')]

data = [(-999.9,),(-0.5,),(-0.3,),(0.0,),(0.2,),(999.9,)]#给定分桶的边界
dataFrame = spark.createDataFrame(data,['feature'])
#初始化分桶器
bucketizer = Bucketizer(splits=splits,inputCol="feature",outputCol="bucketedFeature")

#按照规定对数据进行分桶
bucketedData = bucketizer.transform(dataFrame)

print("Binarizer output with Threshold = %f" % (len(bucketizer.getSplits())-1))
bucketedData.show()

spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1.3 quantile_discretizer/按分位数离散化

from __future__ import print_function
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName('QuantileDiscretizerExample')\
        .getOrCreate()

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data,['id','hour'])
df = df.repartition(1)

#分三个桶进行离散化,根据给定桶的数量来确定边界
discretizer = QuantileDiscretizer(numBuckets=3,inputCol='hour',outputCol='result')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/984553
推荐阅读
相关标签
  

闽ICP备14008679号