赞
踩
Spark SQL是spark的一个模块,用于处理海量的结构化数据。
特点:
优点:
相似之处:
不同之处:
DataFrame是一种分布式的数据集合,它以表格形式(只能以表格的形式)表示,并且具有带有命名列的概念,类似于传统数据库或电子表格应用程序中的表格。DataFrame可以看作是一个关系型数据库中的一张表,或者是Python或R中的一个数据框架,但是不同的是,DataFrame是在分布式环境下运行的,可以处理大量的数据。
在RDD阶段,程序的执行入口为SparkContext。在spark2.0之后,推出了SparkSession对象,作为spark编码的统一入口对象。
前面说过DataFrame是一个二维表结构,那么它的结构一定有三部分组成:行、列和表结构描述。
在结构层面:
在数据层面:
也就是说,Column是包含单个StructFiled对象的,所有的Column组成全部的StructType对象。
下面用一个例子解释各个api的作用及相关参数的含义
- from pyspark import SparkConf, SparkContext
- from pyspark.sql import SparkSession
- import re
-
- if __name__ == '__main__':
- # 构建入口对象
- spark = SparkSession.builder. \
- appName("test"). \
- master("local[*]"). \
- getOrCreate()
- sc = spark.sparkContext
-
- # 基于RDD转换成DataFrame
- rdd = sc.textFile("一个数据文件,格式形如:'username, age'"). \
- map(lambda x: x.split(",")). \
- map(lambda x: (x[0], int(x[1]))) # 将str类型的年龄变为int类型
-
- # 构建DataFrame对象
- # 参数1 被转换的RDD
- # 参数2 指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
- df = spark.createDataFrame(rdd, schema=['name', 'age'])
-
- # 打印表结构
- df.printSchema()
-
- # 打印df中的数据
- # 参数1表示展示出多少条数据,默认不传的话是20
- # 参数2表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以...代皙#如果False表示不阶产全部显示,默认是True
- df.show(20, False)
上面的代码是基于RDD转换成DataFrame的构建方法,下面我们用基于StructType的方法构建。
- schema = StructType().add("name", StringType(), nullable=True). \
- add("age", IntegerType(), nullable=False)
通过Spark SQL的统一api进行数据读取,并构建DataFrame
format是指支持读取的数据格式,schema就是配置StructType的信息,指定数据的类型和名称
DataFrame的编程风格支持两种:DSL和SQL语法风格。
DSL风格指的是使用Spark SQL提供的DataFrame API进行编程,可以支持更加复杂的数据处理操作。DSL风格的代码通常比SQL语法风格的代码更加直观和易于调试,因为DSL代码中可以使用编程语言的各种特性和工具来处理数据,比如函数、变量、循环等。
代码:
- from pyspark.sql import SparkSession
-
- spark = SparkSession.builder.appName("example").getOrCreate()
-
- # 读取csv数据文件
- df = spark.read.format("csv")\
- .option("header", True)\
- .option("inferSchema", True)\
- .load("data.csv")
-
- # 进行数据处理
- result = df.filter(df["age"] > 25)\
- .groupBy("gender")\
- .agg({"salary": "avg"})\
- .orderBy("gender")
-
- # 输出结果
- result.show()
使用SQL语法风格的编程方式,可以直接使用SQL语句对DataFrame进行查询和数据处理。SQL语法风格的代码通常比DSL风格的代码更加简洁和易于理解,因为SQL语句可以直接表达数据处理的逻辑。
代码:
- from pyspark.sql import SparkSession
-
- spark = SparkSession.builder.appName("example").getOrCreate()
-
- # 读取csv数据文件
- df = spark.read.format("csv")\
- .option("header", True)\
- .option("inferSchema", True)\
- .load("data.csv")
-
- # 创建临时视图
- df.createOrReplaceTempView("people")
-
- # 使用SQL语句进行查询
- result = spark.sql("""
- SELECT gender, AVG(salary) as avg_salary
- FROM people
- WHERE age > 25
- GROUP BY gender
- ORDER BY gender
- """)
-
- # 输出结果
- result.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。