赞
踩
SparkSQL 是Spark的一个模块, 用于处理海量结构化数据
第一、针对结构化数据处理,属于Spark框架一个部分
第二、抽象数据结构:DataFrame=RDD+SCHAME(表结构:列名 列类型)
第三、分布式SQL引擎,类似Hive框架
SparkSQL优点:
1、执行速度快,spark可以和hive集成,可以替换掉底层MR,直接使用spark引擎执行SQL
2、spark中可以写sql也可以写代码,也可以混合写
3、目前主流开发方式和技术
Spark Hive 区别:
不同:
1、计算引擎不同,sparkSQL Hive是MR
2、sparkSQL除了写sql还可以写代码,HIve只能写sql
3、sparkSQL基于内存的计算引擎,Hive是基于MR,所以磁盘计算引擎
4、sparkSQL没有元数据服务,Hive由元数据服务
相同点:
1、都可以写SQL
2、都可以精选分布式计算
3、都可以运行在YARN集群上
SparkSQL常见数据对比
SparkSQL:DataFrame,二维表,分布式数据集
pandas:DataFrame,二维表,是单机数据集合
SparkCore:RDD,没有标准的数据结构,可以存储任何数据,分布式数据集(可以分区)
SparkSQL中Dataframe的特点
1、融合性:SQL可以无缝集成在代码中,随时用SQL处理数据
2、统一数据访问:一套标准API可读写不同数据源
3、Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
4、标准化连接:支持标准化JDBC\ODBC连接,方便和各种数据库进行数据交互.
创建sparkSession
import os from pyspark.sql import SparkSession # 配置环境变量 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark_env/bin/python' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/envs/pyspark_env/bin/python' os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241' if __name__ == '__main__': # 1 创建sparksql spark=SparkSession\ .builder.master('local[*]')\ .appName('SoarkSession')\ .getOrCreate() # sc=spark.sparkContext # sc.stop() spark.stop()
dataframe是二维表结构,有行列
表结构的元数据 StructType
字段:StructField(列名 列类型 约束)
列:Column对象
行:Row对象,包含于rdd和字段信息
dataframe有分区,不可变
if __name__ == '__main__': # 1 创建sparksql spark = SparkSession.builder.master('local[*]') \ .appName('SoarkSession').getOrCreate() sc:SparkContext=spark.sparkContext # 2.创建RDD init_rdd=sc.parallelize(['张三 18','李四 20']) map_rdd=init_rdd.map(lambda line:(line.split()[0],int(line.split()[1]))) # print(map_rdd.collect()) #[('张三', '18'), ('李四', '20')] # 3.定义表结构 # 方法一 schema1=StructType()\ .add('name',StringType(),True)\ .add('age',IntegerType(),True) # 方法二 schema2=StructType([StructField('name',StringType(),True), StringType('age',IntegerType(),True)]) # 方法三 schema3="name:string,age:int" # 方法四 schema4=['name','age'] # 方法五 df=spark.createDataFrame(data=map_rdd,schema=schema1) df.printSchema() df.show() # 创建dataframe方法二 # map_rdd.toDF(schema=['name', 'age']) # spark.stop()
场景:【一般用于测试】
createDataFrame():
data:给定初始化的数据
schema:设置元数据信息,结构 字段名称:字段类型,字段名称:字段类型
if __name__ == '__main__':
# 1 创建sparksql
spark = SparkSession.builder.master('local[*]').appName('创建df').getOrCreate()
# 2.创建df: 内部初始化数据得到DataFrame;data:表数据;schema:列名
df=spark.createDataFrame(data=[('张三',10,90),('李四',25,92),('王五',18,100),('赵六',18,95)],schema='name:string,age:int,score:int')
# 查看数据
df.show()
# 打印Schema元数据信息
df.printSchema()
print(df)
运行结果:
统一API格式:
sparksession.read
.format('text|csv|json|parquet|orc|avro|jdbc|.....') # 读取外部文件的方式
.option('k','v') # 选项 可以设置相关的参数 (可选)
.schema(StructType | String) # 设置表的结构信息/Schema元数据信息
.load('加载数据路径') # 读取外部文件的路径, 支持 HDFS 也支持本地
1 Text方式读取
① 会将数据内容读取成一列,并且这个列名叫value,数据类型是string
② 我们只能修改默认字段的名称,也就是修改value的名称,但是不能修改字段的数据类型
assertion failed: Text data source only produces a single data column named "value"
由于Text方式只提供了一个列,列名叫value,schema只能改这一列的名字,不能定义多个字段
解决办法:去掉schema中多个字段的定义
if __name__ == '__main__':
# 1 创建sparksql
spark = SparkSession.builder.master('local[*]').appName('text_read_file_dataframe').getOrCreate()
# 数据输入。Text方式
df=spark.read.format('text').schema('line string').load('file:///root/data/stu.txt')
# 3.查看df: 打印元数据信息
df.printSchema()
# 打印数据内容
df.show()
# 处理DF
df.rdd.map(lambda r:r.line.split()).toDF(schema='id string,name string,address string,sex string,age string').show()
2 CSV方式读取
CSV相关的option参数说明:
header
:指定第一行作为字段名
sep
:指定数据间的分隔符
inferSchema
:自动推断数据的类型,但可能不太精确
encoding
:指定文件的编码方式,默认值UTF-8
if __name__ == '__main__': print("读取外部文件:Csv方式读取") # 1 创建sparksql spark = SparkSession.builder\ .master('local[*]') \ .appName('csv_read_file_dataframe') \ .getOrCreate() # 读取外部文件:CSV方式读取: # 方法一: # df = spark.read.format('csv')\ # .option('header',True).option('sep',' ')\ # .option('inforSchema',True).option('encoding','utf-8') \ # .load('file:///root/data/stu.txt') # # 方法二: # df = spark.read.format('csv')\ # .schema('id int,name string,address string,sex string,age int') \ # .option('header', True).option('sep', ' ') \ # .option('encoding', 'utf-8') \ # .load('file:///root/data/stu.txt') # 方法三:简洁API df:DataFrame = spark.read.csv('file:///root/data/stu.txt', sep=' ',inferSchema=True, header=True,encoding='utf-8', schema='id int,name string,address string,sex string,age int') # 3.查看df 打印元数据信息 df.printSchema() # 打印数据内容 df.show()
3 JSON方式读取
1、如果没有指定schema,或者没有指定inferSchema参数,会进行数据类型的自动推断,但是可能数据类型不太精确
2、json默认会自动解析数据内容,识别出字段名称和数据内容
3、如果没有该字段,那么值就是使用null进行填充
4、schema中指定的字段名称,必须要和json中key保持一致,否则会识别失败,使用null数据进行替换
if __name__ == '__main__':
print("读取外部文件:Json方式")
# 1 创建sparksql
spark = SparkSession.builder.master('local[*]').appName('SoarkSession').getOrCreate()
# 读取外部文件:JSON方式 # 方法一
df = spark.read.format('json').schema('id int,name string,age int,address string')\
.option('inforSchema',True).option('encoding','utf-8').load('file:///root/data/json.txt')
# 方法二 简洁API
df:DataFrame = spark.read.json('file:///root/data/json.txt',encoding='utf-8',
schema='id int,name string,age int,address string')
# 3.查看df # 打印元数据信息
df.printSchema()
# 打印数据内容
df.show()
4 读取文件的简洁API
以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写
格式:spark.read.读取方式()
,eg:
df = spark.read.csv(
path='file:///root/data/stu.txt',
header=True,
sep=' ',
inferSchema=True,
encoding='utf-8',
)
提供了两种方案:SQL方式和DSL方式:
SQL方式:SQL语句操作DataFrame
DSL方式(Domain Specified Language):特定领域语言,其实就是使用特定的API来操作DataFrame
从代码开发角度:可能大部分喜欢使用SQL方式
从程序优化的角度:推荐使用DSL方式,方便Spark对代码进行优化
视图介绍
创建视图:create view hz_view as select * from people where address='杭州'
查询视图:select * from hz_view
视图的主要作用:简化代码
创建一个视图:
df.createTempView('视图名称')
:创建一个临时的视图/表。调用的时候,直接使用视图名称即可
df.createOrReplaceTempView('视图名称')
:创建一个临时的视图/表。如果视图/表已经存在,就替换掉。调用的时候,直接使用视图名称即可
df.createGlobalTempView('视图名称')
:创建一个全局的临时视图/表。调用的时候,需要使用global_temp.视图名称
执行SQL语句:
spark.sql(‘书写SQL’)
eg:
df.createTempView('stu')
spark.sql('select * from stu where id=4').show()
df.where('id=5').show()
① show()
:用于展示DF中数据, 默认仅展示前20行
参数1:设置默认展示多少行 默认为20
参数2:是否为截断列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)
eg:
df.show()
df.show(truncate=True)
结果:
+------+---+-----+---+
| name|age|score|sex|
+------+---+-----+---+
|张三三| 10| 90| 男|
| 李四| 25| 92| 女|
|王五五| 18| 100| 男|
| 赵六| 20| 95| 女|
+------+---+-----+---+
② printSchema()
:用于打印当前这个DF的表结构信息
eg:
df.printSchema()
结果:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- score: integer (nullable = true)
|-- sex: string (nullable = true)
③ select()
:类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样
# 只输出name与age两列
df.select(['name','age']).show()
结果如下:
+------+---+
| name|age|
+------+---+
|张三三| 10|
| 李四| 25|
|王五五| 18|
| 赵六| 20|
+------+---+
④ filter()
和 where()
:用于对数据进行过滤操作, 一般在spark SQL中主要使用where
# # 以下方法都可以输出
df.filter(df.age>18).show()
df.filter('age>18').show()
df.where(df.age>18).show()
df.where('age>18').show()
运行结果如下:
+----+---+-----+---+
|name|age|score|sex|
+----+---+-----+---+
|李四| 25| 92| 女|
|赵六| 20| 95| 女|
+----+---+-----+---+
⑤ groupBy()
:用于执行分组操作
agg
df.groupby('gender').agg({'age':'min', 'score':'max'})
df1.select(df1['min(age)'].alias('min_age'))
# 分组聚合:直接进行聚合
print(df.groupBy('sex').avg().collect())
df1:DataFrame=df.groupBy('sex').agg({'age': "max", 'score': "min"})
df2:DataFrame=df.groupby('sex').agg(avg('age').alias('avg_age'), F.max('score').alias('max_score'))
print("------df1------")
df1.select(df1["min(score)"].alias('min_score'),df1["max(age)"].alias('max_age'),df1.sex).show()
print("------df2------")
df2.show()
⑥ orderBy()
:用于执行排序操作
df.orderBy(df.age.desc()).show()
为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可
导入这个函数库: import pyspark.sql.functions as F
通过F调用对应的函数即可。SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html
if __name__ == '__main__': # 1.创建sparksql spark = SparkSession \ .builder.master('local[*]') \ .appName('SoarkSession') \ .getOrCreate() # 2.加载数据 df:DataFrame=spark.read.format('csv')\ .schema('id int,name string,age int,address string')\ .option('sep',',')\ .option('header',True)\ .load('file:///root/data/clear_data.csv') # 清洗API # 3.去重 # df.dropDuplicates().show() # df.dropDuplicates(subset=['name']).show() # # 4删除缺失值所在的行 # df.dropna().show() # 删除所有缺失值所在的行 # df.dropna(subset=['id','name']).show() # 删除id,name两列有缺失值的行 # df.dropna(subset=['id','name'],how='all').show() # 删除id,name两列同时有缺失值的行 # df.dropna(subset=['id','name','age'],thresh=2).show() # 删除id,name,age有两个以上缺失值的行 # # 5.填充缺失值,对所有的缺失值都填充统一的默认值 # df.fillna({'id': 99, 'name': '匿名', 'age': 100, 'address': '杭州'}).show() # 查看元数据信息 df.printSchema() # 查看数据 df.show() # 释放资源 spark.stop()
Spark SQL底层实际也会被翻译成Spark RDD的代码,所以Spark SQL也会Shuffle过程。Shuffle之后,分区数量默认是200个。有的时候,数据太少,200个分区太大;有的时候,数据太多,导致200个分区不够用。针对这个默认分区数不合适的问题,提供spark.sql.shuffle.partitions的参数,用来设置Shuffle分区数
方案一(不推荐): 直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为: spark.sql.shuffle.partitions 20
方案二(比较常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候
./spark-submit --conf "spark.sql.shuffle.partitions=20"
方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的
sparkSession.conf.set('spark.sql.shuffle.partitions',20)
if __name__ == '__main__': # 1 创建sparksql spark = SparkSession \ .builder.master('local[*]') \ .appName('write_mysql_dataframe') \ .getOrCreate() sc: SparkSession = spark.sparkContext # 2.读取文件中的数据生成dataframe df: DataFrame = spark.read.format('csv') \ .schema('id int,name string,address string,sex string,age int') \ .option('sep', ' ').load('file:///root/data/stu.txt') df.show() df.printSchema() # 把数据写出到csv文件中 df.write \ .format('csv') \ .option('header', 'True') \ .option('sep', ' ') \ .save(path='hdfs://node1:8020/save/file_output_stu.txt', mode='overwrite') # 把数据写出到mysql中 df.write.jdbc( url='jdbc:mysql://node1:3306/dbspark?useUnicode=true&characterEncoding=utf8', table='stu', mode='append', properties={"user": "root", "password": "123456"}) # 释放资源 spark.stop()
准备工作:向HDFS上传一个words.txt的文件,上传到hdfs的 /input
# 快捷键: main + 回车 if __name__ == '__main__': # 1 创建sparksql spark = SparkSession \ .builder.master('local[*]') \ .appName('SoarkSession') \ .getOrCreate() # 读取文件 df: DataFrame = spark.read.format('text').option('sep', ' ').load('file:///root/data/word.txt') df.show() # DSL方式: 方式一 df.select(F.explode(F.split('value',' ')).alias('word') ).groupby('word').count().withColumnRenamed('count','cnt').show() # DSL方式二 # agg中可以写各种聚合操作。多个聚合操作之间,使用逗号分隔 df.select( F.explode(F.split('value',' ')).alias('word') ).groupby('word').agg(F.count('word').alias('cnt')).show() # DSL方式三 df.withColumn('word',F.explode(F.split(df['value'],' ')))\ .groupby('word').agg(F.count('word').alias('cnt')).show() # # sql方式1 df.createTempView('tb_words') # 创建临时视图 spark.sql(""" select word,count(*) from ( select explode(split(value,' ')) as word from tb_words ) words group by word """).show() # sql方式2 spark.sql(""" select word,count(*) from tb_words lateral view explode(split(value,'')) t as word group by word """).show() # 5、释放资源 spark.stop()
SQL函数,主要分为以下三大类:
UDF函数:用户自定义函数
特点:一对一,也就是输入一个值返回一个值
例如:split() substr()
UDAF函数:用户自定义聚合函数
特点:多对一,也就是输入多个值返回一个值
例如:sum() avg() count()
UDTF函数:用户自定义表数据生成函数
特点:一对多,也就是输入一个值返回多个值
例如:explode()
Spark SQL中的函数,基本就是属于如上三类中的某一类
数据准备:
if __name__ == '__main__': # 1 创建sparksql spark = SparkSession \ .builder.master('local[*]') \ .appName('SoarkSession') \ .getOrCreate() # 定义数据 df=spark.createDataFrame( data=[('张三',10,'北京'),('李四',25,'北京'),('王五',18,'北京'), ('赵六',18,'北京')], schema='name:string,age:int,address:string') # 创建临时视图 df.createTempView('stu') # 定义方法 def add_edu(s): return s+'_edu'
自定义函数流程:
第一步: 定义自己的Python函数,实现代码逻辑
第二步: 是将自己的Python函数注册进SparkSQL中,后续就可以在SQL或DSL
方式一(推荐使用):udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:在sql中使用的名称,需要遵循Python的命名风格
参数2:也就是第一步定义的Python函数(也可以是F.udf F.pandas_udf装饰的函数/对象)
可以是python的原生的函数
也可以是通过F.udf进行封装的udf函数
参数3:自定义函数的返回值数据类型,是SparkSQL中的数据类型
udf对象:返回值对象,是一个UDF对象,可以用于DSL中
方式一定义的UDF函数,可以用在DSL和SQL中
# 注册为spark sql函数
dsl_add_edu=spark.udf.register('sql_add_edu',add_edu,StringType())
# ① SQL中调用spark sql函数
spark.sql("""
select name,age,address,
sql_add_edu(name),
sql_add_edu(address)
from stu
""").show()
# ② DSL中使用
df.select('name','age','address',dsl_add_edu('name'),dsl_add_edu('address')).show()
方式二:udf对象 = F.udf(参数1,参数2)
参数1:也就是第一步定义的Python函数
参数2:UDF的返回值数据类型,是SparkSQL中的数据类型
udf对象:返回值对象,是一个UDF对象,可以用于DSL中
(方式二定义的UDF函数,只能用在DSL中)
udf_add_edu2=F.udf(add_edu,StringType())
# 调用方法二,DSL中使用
df.select('name','age','address',udf_add_edu2('name'),udf_add_edu2('address')).show()
方式三:@F.udf(returnType=返回值数据类型)
,放在第一步自定义的Python函数上面。叫做语法糖方式(方式三定义的UDF函数,只能用在DSL中)
# 定义语法糖
@F.udf()
def add3(s):
return s + '_edu3'
# 调用方法三,DSL中使用
df.select('name','age','address',
add3('name'),
add3('address')
).show()
第三步: 在DSL或SQL中使用自定义的UDF函数
eg:自定义UDF函数,让其返回值类型为复杂类型: 字典 列表 元组
if __name__ == '__main__': # 1 创建sparksql spark = SparkSession \ .builder.master('local[*]') \ .appName('SoarkSession') \ .getOrCreate() df=spark.createDataFrame( data=[(1, '张三 杭州'), (2, '李四 北京'), (3, '王二麻子 上海')], schema='id:int,name_address:string' ) # 创建临时试图 df.createTempView('stu') # 定义函数 def my_split(name_address): l=name_address.split(' ') return {'name':l[0],'address':l[1]} # 第二步:将Python函数注册成Spark SQL函数 # 方式一 schema=StructType().add('name',StringType()).add('address',StringType()) # 注册为spark sql函数 dsl_my_split = spark.udf.register('sql_my_split', my_split, schema) # 第三步:调用 # SQL方式 sql_df=spark.sql(""" select id,name_address, sql_my_split(name_address)['name'] as name, sql_my_split(name_address)['address'] as address, sql_my_split(name_address) as na from stu """).show() # DSL方式 df.select('id','name_address', dsl_my_split('name_address').alias('na'), dsl_my_split('name_address')['name'].alias('name'), dsl_my_split('name_address')['address'].alias('address') ).show() # 释放资源 spark.stop()
pandas中UDF的定义:
第一步:自定义Python函数
第二步:将定义的Python函数注册成Spark SQL中的函数
方式一:
udf对象 = F.pandas_udf(参数1,参数2)
参数1:自己定义的Python函数
参数2:函数返回的数据类型,是SparkSQL中的数据类型
udf对象:返回值类型,是一个UDF对象
可以在DSL中使用
方式二:语法糖形式,使用@F.pandas_udf(returnType=返回值数据类型)
可以在DSL中使用
方式三:sparksession.udf.register(参数1,参数2)
参数1:后面在SQL要调用的UDF函数名称,字符串数据类型
参数2:通过F.pandas_udf装饰之后的对象
可以在SQL中使用
第三步:在SQL/DSL中调用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。