赞
踩
datafram就是一个内存中的二维表结构,具备表结构的三个基本属性:
dataframe是可以从RDD中直接转化而来的,通过sparksession对象的createDataFrame方法可以将RDD转化为DataFrame,下例:
假设我们有一个people.txt,内容如下:
zhangsan,28
lisi,29
wangwu,30
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建spark
spark = SparkSession.builder.\
appName("Create df").\
master("local[*]").\
getOrCreate()
# 获取context
sc = spark.sparkContext
# 构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("people.txt").\
map(lambda x : x.split(',')).\
map(lambda x : [x[0], int(x[1])]) # RDD会做类型自动探测,这里需要做类型转换
# 构建df,RDD按照提供的字段顺序一次获取信息
df = spark.createDataFrame(rdd, schema = ["name", "age"])
# 打印df结构
df.printSchema()
# 打印20行数据
# show有两个参数,参数1指定展示多少条数据,默认20
# 参数2表示是否对列进行截断,如果列的长度超过20个字符传长度,后续的内容以...代替,不全打印
df.show()
# df.show(15, False)
# 构建临时视图表,让我们可以用sparksql的方式查询表内容
df.createTempView("peopleTable")
spark.sql("select * from peopleTable where age < 29").show()
structtype类可以定义整个DataFrame中的schema,也即 df = spark.createDataFrame(rdd, schema = [“name”, “age”])这里的schema可以通过structType来指定
# 需导入以下类
from pyspark.sql import SturctType StringType IntegerType
# 定义表结构,第一个为列名称,第二个参数列数据类型,第三个是否允许为空
schema = StructTpe.\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=False).\
add("score", IntegerType(), nullable=False)
# 将rdd转为scheme结构
df = spark.createDataFrame(rdd, schema)
rdd对象本身有toDF()方法,可以通过参数指定表结构来将rdd转为DataFrame,参数也细分为两种,一种是直接指定表头,如下:
df1 = rdd.toDF(["name", "age"])
这种方法只能指定列名,无法指定列数据类型,只能靠编译器自动推断类型,在类型不敏感时可用,另外一种则是利用上面提到的StructType类先定义号表结构,传入指定表结构变量
df2 = rdd.toDF(schema=schema)
createDataFrame直接接收pandas的df即可转换
from pyspark.sql import SparkSession
import pandas as pd
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 定义pandas的DataFrame
pdf = pd.DataFrame(
{
"id" : [1, 2, 5],
"name" : ["zhang", "wang", "li"],
"age" : [11, 23, 55]
}
)
# 转换为spark的DF
df = spark.createDataFrame(pdf)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。