当前位置:   article > 正文

PySpark实战指南:准备数据建模_pyspark维度建模

pyspark维度建模

准备数据建模

  1. from pyspark.context import SparkContext
  2. from pyspark.sql.session import SparkSession
  3. sc = SparkContext('local')
  4. spark = SparkSession(sc)
  5. df = spark.createDataFrame([
  6. (1, 144.5, 5.9, 33, 'M'),
  7. (2, 167.2, 5.4, 45, 'M'),
  8. (3, 124.1, 5.2, 23, 'F'),
  9. (4, 144.5, 5.9, 33, 'M'),
  10. (5, 133.2, 5.7, 54, 'F'),
  11. (3, 124.1, 5.2, 23, 'F'),
  12. (5, 129.2, 5.3, 42, 'M'),
  13. ], ['id', 'weight', 'height', 'age', 'gender'])
  14. df.show()
  1. +---+------+------+---+------+
  2. | id|weight|height|age|gender|
  3. +---+------+------+---+------+
  4. | 1| 144.5| 5.9| 33| M|
  5. | 2| 167.2| 5.4| 45| M|
  6. | 3| 124.1| 5.2| 23| F|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 5| 133.2| 5.7| 54| F|
  9. | 3| 124.1| 5.2| 23| F|
  10. | 5| 129.2| 5.3| 42| M|
  11. +---+------+------+---+------+
  1. print(df.count()) #打印出行数--- # 7
  2. print(df.distinct().count()) # 6
  1. #删除重复的样本
  2. df = df.dropDuplicates()
  3. df.show()
  1. +---+------+------+---+------+
  2. | id|weight|height|age|gender|
  3. +---+------+------+---+------+
  4. | 5| 133.2| 5.7| 54| F|
  5. | 5| 129.2| 5.3| 42| M|
  6. | 1| 144.5| 5.9| 33| M|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 2| 167.2| 5.4| 45| M|
  9. | 3| 124.1| 5.2| 23| F|
  10. +---+------+------+---+------+
  1. #计算id 的总数和 不同id 的个数
  2. import pyspark.sql.functions as F
  3. df.agg(
  4. F.count('id').alias('all'),
  5. F.countDistinct('id').alias('distinct_id')
  6. ).show()
  1. +---+-----------+
  2. |all|distinct_id|
  3. +---+-----------+
  4. | 6| 5|
  5. +---+-----------+
  1. #设置唯一的 id 号
  2. df.withColumn('new_id', F.monotonically_increasing_id()).show()
  1. +---+------+------+---+------+-------------+
  2. | id|weight|height|age|gender| new_id|
  3. +---+------+------+---+------+-------------+
  4. | 5| 133.2| 5.7| 54| F| 171798691840|
  5. | 5| 129.2| 5.3| 42| M| 326417514496|
  6. | 1| 144.5| 5.9| 33| M| 481036337152|
  7. | 4| 144.5| 5.9| 33| M| 644245094400|
  8. | 2| 167.2| 5.4| 45| M| 721554505728|
  9. | 3| 124.1| 5.2| 23| F|1623497637888|
  10. +---+------+------+---+------+-------------+

缺失值处理

  1. df_miss = spark.createDataFrame([
  2. (1, 143.5, 5.6, 28, 'M', 100000),
  3. (2, 167.2, 5.4, 45, 'M', None),
  4. (3, None , 5.2, None, None, None),
  5. (4, 144.5, 5.9, 33, 'M', None),
  6. (5, 133.2, 5.7, 54, 'F', None),
  7. (6, 124.1, 5.2, None, 'F', None),
  8. (7, 129.2, 5.3, 42, 'M', 76000),
  9. ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
  10. df_miss.show()
  1. +---+------+------+----+------+------+
  2. | id|weight|height| age|gender|income|
  3. +---+------+------+----+------+------+
  4. | 1| 143.5| 5.6| 28| M|100000|
  5. | 2| 167.2| 5.4| 45| M| null|
  6. | 3| null| 5.2|null| null| null|
  7. | 4| 144.5| 5.9| 33| M| null|
  8. | 5| 133.2| 5.7| 54| F| null|
  9. | 6| 124.1| 5.2|null| F| null|
  10. | 7| 129.2| 5.3| 42| M| 76000|
  11. +---+------+------+----+------+------+
  1. #计算出每行的缺失值, * 指示该方法计算所有的列
  2. df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row]))).collect()

 

#计算出每行的缺失值, * 指示该方法计算所有的列 df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row]))).collect()

 
#计算出每行的缺失值, * 指示该方法计算所有的列
df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row]))).collect()

Out[9]:

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
  1. #计算每个特征的缺失率
  2. df_miss.agg(
  3. *[(1 - (F.count(c) / F.count('*'))).alias(c+'_missing') for c in df_miss.columns]
  4. ).show()
  1. +----------+------------------+--------------+------------------+------------------+------------------+
  2. |id_missing| weight_missing|height_missing| age_missing| gender_missing| income_missing|
  3. +----------+------------------+--------------+------------------+------------------+------------------+
  4. | 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
  5. +----------+------------------+--------------+------------------+------------------+------------------+
  1. #删除income 列
  2. data_drop_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
  3. data_drop_income.show()
  1. +---+------+------+----+------+
  2. | id|weight|height| age|gender|
  3. +---+------+------+----+------+
  4. | 1| 143.5| 5.6| 28| M|
  5. | 2| 167.2| 5.4| 45| M|
  6. | 3| null| 5.2|null| null|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 5| 133.2| 5.7| 54| F|
  9. | 6| 124.1| 5.2|null| F|
  10. | 7| 129.2| 5.3| 42| M|
  11. +---+------+------+----+------+
  1. #删除样本中多于3 个缺失值的样本
  2. data_drop_income.dropna(thresh=3).show()
  1. +---+------+------+----+------+
  2. | id|weight|height| age|gender|
  3. +---+------+------+----+------+
  4. | 1| 143.5| 5.6| 28| M|
  5. | 2| 167.2| 5.4| 45| M|
  6. | 4| 144.5| 5.9| 33| M|
  7. | 5| 133.2| 5.7| 54| F|
  8. | 6| 124.1| 5.2|null| F|
  9. | 7| 129.2| 5.3| 42| M|
  10. +---+------+------+----+------+
  1. #对连续的特征以平均数填充,离散特征 为 missing
  2. means = data_drop_income.agg(*
  3. [F.mean(c).alias(c) for c in data_drop_income.columns if c != 'gender']
  4. ).toPandas().to_dict('recordes')[0]
  5. means['gender'] = 'missing'
  6. data_drop_income.fillna(means).show()
  1. +---+------------------+------+---+-------+
  2. | id| weight|height|age| gender|
  3. +---+------------------+------+---+-------+
  4. | 1| 143.5| 5.6| 28| M|
  5. | 2| 167.2| 5.4| 45| M|
  6. | 3|140.28333333333333| 5.2| 40|missing|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 5| 133.2| 5.7| 54| F|
  9. | 6| 124.1| 5.2| 40| F|
  10. | 7| 129.2| 5.3| 42| M|
  11. +---+------------------+------+---+-------+

离群值

  1. #离群值
  2. df_outliers = spark.createDataFrame([
  3. (1, 143.5, 5.3, 28),
  4. (2, 154.2, 5.5, 45),
  5. (3, 342.3, 5.1, 99),
  6. (4, 144.5, 5.5, 33),
  7. (5, 133.2, 5.4, 54),
  8. (6, 124.1, 5.1, 21),
  9. (7, 129.2, 5.3, 42),
  10. ], ['id', 'weight', 'height', 'age'])
  11. df_outliers.show()
  1. +---+------+------+---+
  2. | id|weight|height|age|
  3. +---+------+------+---+
  4. | 1| 143.5| 5.3| 28|
  5. | 2| 154.2| 5.5| 45|
  6. | 3| 342.3| 5.1| 99|
  7. | 4| 144.5| 5.5| 33|
  8. | 5| 133.2| 5.4| 54|
  9. | 6| 124.1| 5.1| 21|
  10. | 7| 129.2| 5.3| 42|
  11. +---+------+------+---+
  1. cols = ['weight', 'height', 'age']
  2. bounds = {}
  3. for col in cols:
  4. quan = df_outliers.approxQuantile(col, (0.25, 0.75), 0.05)
  5. IQR = quan[1] - quan[0]
  6. bounds[col] = [quan[0] - 1.5 * IQR, quan[1] + 1.5 * IQR]
  7. bounds
  1. {'weight': [91.69999999999999, 191.7],
  2. 'height': [4.499999999999999, 6.1000000000000005],
  3. 'age': [-11.0, 93.0]}
  1. # 找出离群点
  2. outliers = df_outliers.select(*['id'] + [
  3. (
  4. (df_outliers[c] < bounds[c][0]) |
  5. (df_outliers[c] > bounds[c][1])
  6. ).alias(c + '_o') for c in cols
  7. ])
  8. outliers.show()
  1. +---+--------+--------+-----+
  2. | id|weight_o|height_o|age_o|
  3. +---+--------+--------+-----+
  4. | 1| false| false|false|
  5. | 2| false| false|false|
  6. | 3| true| false| true|
  7. | 4| false| false|false|
  8. | 5| false| false|false|
  9. | 6| false| false|false|
  10. | 7| false| false|false|
  11. +---+--------+--------+-----+
  1. # 显示出离群点的值
  2. df_outliers = df_outliers.join(outliers, on='id')
  3. df_outliers.filter('weight_o').select('id', 'weight').show()
  4. df_outliers.filter('age_o').select('id', 'age').show()
  1. +---+------+
  2. | id|weight|
  3. +---+------+
  4. | 3| 342.3|
  5. +---+------+
  6. +---+---+
  7. | id|age|
  8. +---+---+
  9. | 3| 99|
  10. +---+---+

项目推荐:

2000多G的计算机各行业电子资源分享(持续更新)

2020年微信小程序全栈项目之喵喵交友【附课件和源码】

Spring Boot开发小而美的个人博客【附课件和源码】

Java微服务实战296集大型视频-谷粒商城【附代码和课件】

Java开发微服务畅购商城实战【全357集大项目】-附代码和课件

最全最详细数据结构与算法视频-【附课件和源码】

在这里插入图片描述

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/993412
推荐阅读
相关标签
  

闽ICP备14008679号