赞
踩
- # python3
- # -*- coding:utf-8 -*-
- # @Time: 3/26/20 7:14 PM
- # @Author: Damon
- # @Software: PyCharm
-
- import logging
- from pyspark.sql import *
- from pyspark.sql import functions as F
-
- def score_df_processing(rdd,spark):
- """
- :param data:
- :return:
- """
- print(type(rdd))
-
- if rdd.isEmpty():
- logging.info(' error: input score data empty...')
- return None
- else:
- row=Row("score") #列名
- df=rdd.map(row).toDF() #RDD转为DF
-
- print(df.show()) #第一次输出
- #按照“row_number”升序排序
- df=df.withColumn('row_number',F.row_number().over(Window.orderBy("score")))
- print(df.show()) #第二次输出
- row_nums = df.count()
- df=df.withColumn('slots',df.row_number*100/row_nums)
- print(df.show(10)) #第三次输出
- return df.rdd
-
-
- spark = SparkSession.builder.appName('rdd_df').config("local*").getOrCreate()
- sc = spark.sparkContext
- rdd=sc.parallelize([4, 2, 3, 4])
- #rdd = sc.parallelize([("a",1),("b",2),("c",3)])
- rdd=score_df_processing(rdd,spark)
- print(rdd)
- # python3
- # -*- coding:utf-8 -*-
- # @Time: 3/26/20 7:14 PM
- # @Author: Damon
- # @Software: PyCharm
-
- import logging
- from pyspark.sql import *
- from pyspark.sql import functions as F
-
- def score_df_processing(rdd,spark):
- """
- :param data:
- :return:
- """
- print(type(rdd))
-
- if rdd.isEmpty():
- logging.info(' error: input score data empty...')
- return None
- else:
-
- #指明两列列名
- df=spark.createDataFrame(rdd,["en","score"])
- #row=Row("score")
- #df=rdd.map(row).toDF()
- print(df.show())
- df=df.withColumn('row_number',F.row_number().over(Window.orderBy("score")))
- print(df.show())
- row_nums = df.count()
- df=df.withColumn('slots',df.row_number*100/row_nums)
- print(df.show(10))
- return df.rdd
-
- spark = SparkSession.builder.appName('imsi_history').config("local*").getOrCreate()
- sc = spark.sparkContext
-
- #rdd=sc.parallelize([4, 2, 3, 4])
-
- rdd = sc.parallelize([("a",3),("b",1),("c",2)])
-
- #rdd=[(1),(2),(3)]
- rdd=score_df_processing(rdd,spark)
- print(rdd)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。