当前位置:   article > 正文

如何基于RDD方式完成DataFrame的代码构建?

如何基于RDD方式完成DataFrame的代码构建?

DataFrame对象可以从RDD转换而来,都是分布式数据集 其实就是转换一下内部存储的结构,转换为二维表结构。

将RDD转换为DataFrame方式1:

调用spark

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
  map(lambda x: x.split(',')).\
  map(lambda x: [x[0], int(x[1])])               # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

# coding:utf8
# 演示DataFrame创建的三种方式
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession.builder.\
       appName("create df").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext
# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
# 打印表结构
df.printSchema()
# 打印20行数据
df.show()
df.createTempView("ttt")
spark.sql("select * from ttt where age< 30").show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

将RDD转换为DataFrame方式2:
通过StructType对象来定义DataFrame的“表结构”转换RDD

# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
  map(lambda x:x.split(',')).\
  map(lambda x:(int(x[0]), x[1], int(x[2])))

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)
# coding:utf8
# 需求: 基于StructType的方式构建DataFrame 同样是RDD转DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
  spark = SparkSession.builder.\
    appName("create_df"). \
    config("spark.sql.shuffle.partitions", "4"). \
    getOrCreate()
  # SparkSession对象也可以获取 SparkContext
  sc = spark.sparkContext
  # 创建DF , 首先创建RDD 将RDD转DF
  rdd = sc.textFile("../data/sql/stu_score.txt").\
    map(lambda x:x.split(',')).\
    map(lambda x:(int(x[0]), x[1], int(x[2])))
  # StructType 类
  # 这个类 可以定义整个DataFrame中的Schema
  schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
  # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
  # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
  df = spark.createDataFrame(rdd, schema)
  df.printSchema()
  df.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

将RDD转换为DataFrame方式3:

使用RDD的toDF方法转换RDD

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空

# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()

# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
# coding:utf8
# 需求: 使用toDF方法将RDD转换为DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
    spark = SparkSession.builder.\
      appName("create_df"). \
      config("spark.sql.shuffle.partitions", "4"). \
      getOrCreate()
    # SparkSession对象也可以获取 SparkContext
    sc = spark.sparkContext
    # 创建DF , 首先创建RDD 将RDD转DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
       add("id", IntegerType(), nullable=False).\
       add("name", StringType(), nullable=True).\
       add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    # 方式2: 传入完整的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/739102
推荐阅读
相关标签
  

闽ICP备14008679号