赞
踩
您不能在Spark中为DataFrame添加任意列。只能通过使用文字创建新列(其他文字类型在
How to add a constant column in a Spark DataFrame?中描述)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
转换现有列:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
包括使用join:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
或使用函数/ udf生成:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
性能方面,映射到Catalyst表达式的内置函数(pyspark.sql.functions)通常优于Python用户定义的函数。
如果你想添加任意RDD的内容作为列,你可以
>添加row numbers to existing data frame>在RDD上调用zipWithIndex并将其转换为数据帧>使用index作为连接键连接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。