当前位置:   article > 正文

Pyspark实现RDD与DataFrame转换_pyspark streaming rdd 转为dataframe todf

pyspark streaming rdd 转为dataframe todf

方式一(只有一列值的转换)

  1. # python3
  2. # -*- coding:utf-8 -*-
  3. # @Time: 3/26/20 7:14 PM
  4. # @Author: Damon
  5. # @Software: PyCharm
  6. import logging
  7. from pyspark.sql import *
  8. from pyspark.sql import functions as F
  9. def score_df_processing(rdd,spark):
  10. """
  11. :param data:
  12. :return:
  13. """
  14. print(type(rdd))
  15. if rdd.isEmpty():
  16. logging.info(' error: input score data empty...')
  17. return None
  18. else:
  19. row=Row("score") #列名
  20. df=rdd.map(row).toDF() #RDD转为DF
  21. print(df.show()) #第一次输出
  22. #按照“row_number”升序排序
  23. df=df.withColumn('row_number',F.row_number().over(Window.orderBy("score")))
  24. print(df.show()) #第二次输出
  25. row_nums = df.count()
  26. df=df.withColumn('slots',df.row_number*100/row_nums)
  27. print(df.show(10)) #第三次输出
  28. return df.rdd
  29. spark = SparkSession.builder.appName('rdd_df').config("local*").getOrCreate()
  30. sc = spark.sparkContext
  31. rdd=sc.parallelize([4, 2, 3, 4])
  32. #rdd = sc.parallelize([("a",1),("b",2),("c",3)])
  33. rdd=score_df_processing(rdd,spark)
  34. print(rdd)

结果


方式二 (两列值的转换)

  1. # python3
  2. # -*- coding:utf-8 -*-
  3. # @Time: 3/26/20 7:14 PM
  4. # @Author: Damon
  5. # @Software: PyCharm
  6. import logging
  7. from pyspark.sql import *
  8. from pyspark.sql import functions as F
  9. def score_df_processing(rdd,spark):
  10. """
  11. :param data:
  12. :return:
  13. """
  14. print(type(rdd))
  15. if rdd.isEmpty():
  16. logging.info(' error: input score data empty...')
  17. return None
  18. else:
  19. #指明两列列名
  20. df=spark.createDataFrame(rdd,["en","score"])
  21. #row=Row("score")
  22. #df=rdd.map(row).toDF()
  23. print(df.show())
  24. df=df.withColumn('row_number',F.row_number().over(Window.orderBy("score")))
  25. print(df.show())
  26. row_nums = df.count()
  27. df=df.withColumn('slots',df.row_number*100/row_nums)
  28. print(df.show(10))
  29. return df.rdd
  30. spark = SparkSession.builder.appName('imsi_history').config("local*").getOrCreate()
  31. sc = spark.sparkContext
  32. #rdd=sc.parallelize([4, 2, 3, 4])
  33. rdd = sc.parallelize([("a",3),("b",1),("c",2)])
  34. #rdd=[(1),(2),(3)]
  35. rdd=score_df_processing(rdd,spark)
  36. print(rdd)

结果

stackoverflow解答 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/389343
推荐阅读
相关标签
  

闽ICP备14008679号