赞
踩
类名 | 概述 |
---|---|
SparkSession(sparkContext[, jsparkSession]) | 使用 Dataset 和 DataFrame API 对 Spark 进行编程的入口点 |
DataFrame(jdf, sql_ctx) | 分组到命名列中的分布式数据集合 |
Column(jc) | DataFrame 中的列 |
Row | DataFrame 中的行 |
GroupedData(jgd, df) | 由DataFrame. groupby()创建的一组用于在DataFrame上聚合的方法。 |
PandasCogroupedOps(gd1, gd2) | 两个GroupedData的逻辑分组,由GroupedData.cogroup()创建。 |
DataFrameNaFunctions(df) | 在DataFrame中处理丢失数据的功能。 |
DataFrameStatFunctions(df) | 具有DataFrame的统计函数的功能。 |
Window | 用于在 DataFrames 中定义窗口的实用函数。 |
使用Dataset和DataFrame API编程Spark的入口点。要创建Spark会话,应使用SparkSession.builder属性。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 详细spark创建如下:
# spark = SparkSession.builder \
# .master("local") \
# .appName("Word Count") \
# .config("spark.some.config.option", "some-value") \
# .getOrCreate()
方法 | 概述 |
---|---|
SparkSession.builder.appName(name) | 设置应用程序的名称,该名称将显示在Spark web UI中。如果未设置应用程序名称,将使用随机生成的名称。 |
SparkSession.builder.config([key, value, conf]) | 设置配置选项。使用此方法设置的选项会自动传播到SparkConf和SparkSession自己的配置。 |
SparkSession.builder.enableHiveSupport() | 启用 Hive 支持,包括与持久 Hive 元存储的连接、对 Hive SerDes 的支持和 Hive 用户定义的函数。 |
SparkSession.builder.getOrCreate() | 获取一个现有的SparkSession,如果没有现有的SparkSession,则根据此构建器中设置的选项创建一个新的SparkSession。 |
SparkSession.builder.master(master) | 设置要连接的 Spark 主 URL,例如“local”以在本地运行,“local[4]”以 4 个内核在本地运行,或“spark://master:7077”以在 Spark 独立集群上运行。 |
SparkSession.catalog | 用户可以通过该接口创建、删除、更改或查询底层数据库、表、函数等。 |
SparkSession.conf | Spark的运行时配置接口。这是用户可以获取和设置与 Spark SQL 相关的所有 Spark 和 Hadoop 配置的界面。获取配置的值时,默认为在基础中设置的值SparkContext(如果有)。 |
SparkSession.createDataFrame(data[, schema, …]) | 从RDD、list或panda .DataFrame中创建一个DataFrame。 |
SparkSession.getActiveSession() | 返回当前线程的活动SparkSession,由构建器返回 |
SparkSession.newSession() | 返回一个新的SparkSession作为新会话,它有单独的SQLConf、注册的临时视图和udf,但是共享SparkContext和表缓存。 |
SparkSession.range(start[, end, step, …]) | 使用单个pyspark.sql.types.LongType列创建一个名为id的DataFrame,包含从开始到结束(独占)范围内的元素,步骤值为step。 |
SparkSession.read | 返回一个DataFrameReader,可用于将数据作为DataFrame读入。 |
SparkSession.readStream | 返回一个DataStreamReader,可用于将数据流作为流式DataFrame读取。 |
SparkSession.sparkContext | 返回底层的SparkContext。 |
SparkSession.sql(sqlQuery) | 返回表示给定查询结果的DataFrame。 |
SparkSession.stop() | 停止底层的SparkContext。 |
SparkSession.streams | 返回一个StreamingQueryManager,它允许管理该上下文中活动的所有StreamingQuery实例。 |
SparkSession.table(tableName) | 以DataFrame的形式返回指定的表。 |
SparkSession.udf | 返回UDF注册的UDFRegistration。 |
SparkSession.version | 运行此应用程序的Spark版本。 |
(1)概述
其作用:设置配置选项。使用此方法设置的选项会自动传播到SparkConf和SparkSession自己的配置。
builder.config( key = None , value = None , conf = None )
参数:
key:str型(可选),配置属性的键名字符串
value:str型(可选),配置属性的值
conf :SparkConf(可选),一个实例SparkConf
(2)实例
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
# 对现有的SparkConf,使用conf参数
SparkSession.builder.config(conf=SparkConf())
# 对于(key,value)对,可以省略参数名称
SparkSession.builder.config("spark.some.config.option", "some-value")
(1)概述
其作用:获取一个现有的SparkSession,如果没有现有的SparkSession,则根据此构建器中设置的选项创建一个新的SparkSession。
builder.getOrCreate( )
该方法首先检查是否存在有效的全局默认 SparkSession,如果存在,则返回那个。如果不存在有效的全局默认 SparkSession,该方法会创建一个新的 SparkSession 并将新创建的 SparkSession 分配为全局默认值。
(2)实例
from pyspark.sql import SparkSession
s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
s1.conf.get("k1") == "v1"
运行结果:
如果返回现有 SparkSession,则此构建器中指定的配置选项将应用于现有 SparkSession。
s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
s1.conf.get("k1") == s2.conf.get("k1")
s1.conf.get("k2") == s2.conf.get("k2")
运行结果:
(1)概述
其作用:从RDD、list或panda .DataFrame中创建一个DataFrame。
SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
说明:
(1)从RDD,list或一个pandas.DataFrame,创建一个DataFrame。(2)当schema是列名列表时,将从数据中推断出每个列的类型。
(3)当schema为None时,它将尝试从数据推断模式(列名和类型),数据应该是Row、namedtuple或dict的RDD。
(4)当schema为pyspark.sql.types.DataType或数据类型字符串时,它必须与实际数据匹配,否则将在运行时引发异常。如果给定的模式不是pyspark.sql.types。StructType,它将被包装成pyspark.sql.types.StructType作为它的唯一字段,并且字段名将是" value "。每个记录也将被包装到一个元组中,稍后可以将其转换为行。
(5)如果需要模式推断,则使用samplingRatio来确定用于模式推断的行比率。如果samplingRatio为None,将使用第一行。
参数:
data:RDD or iterable
任何类型的SQL数据表示(行,元组,int, boolean等),或列表,或pandas.DataFrame的RDD。
schema:pyspark.sql.types.DataType, str or list(可选)
Pyspark.sql.types.DataType 或数据类型字符串或列名列表,默认为 None。数据类型字符串格式等于pyspark.sql.types.DataType。简单字符串,除了顶层结构类型可以省略struct<>和原子类型使用typeName()作为他们的格式,例如使用byte代替tinyint pyspark.sql.types.ByteType。我们还可以使用int作为pyspark.sql.types.IntegerType的简短名称。
samplingRatio:float(可选)
用于推断的行的样本比率
verifySchema:bool(可选)
根据模式验证每一行的数据类型。默认启用。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
lst = [('YFater', 18)]
spark.createDataFrame(lst).collect()
运行结果:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
lst = [('YFater', 18)]
spark.createDataFrame(lst,['name','age']).collect()
运行结果:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
lst1 = [{'name':'YFater','age':18}]
spark.createDataFrame(lst1).collect()
运行结果:
from pyspark.sql import SparkSession
from pyspark.shell import sc
spark = SparkSession.builder.getOrCreate()
rdd = sc.parallelize(lst)
spark.createDataFrame(rdd,['name','age']).collect()
运行结果:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.getOrCreate()
Person = Row('name','age')
lst = [('YFater', 18)]
rdd = sc.parallelize(lst)
person = rdd.map(lambda r:Person(*r))
df1 = spark.createDataFrame(person)
df1.collect()
运行结果:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
schema = StructType([StructField('name',StringType(),True),
StructField("age", IntegerType(), True)])
spark.createDataFrame(rdd,schema).collect()
运行结果:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.createDataFrame(df1.toPandas()).collect()
运行结果:
(1)概述
其作用:返回当前线程的活动SparkSession,由构建器返回。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
lst = [('YFater', 18)]
rdd = spark.sparkContext.parallelize(lst)
df = spark.createDataFrame(rdd, ['name', 'age'])
df.select("name").collect()
运行结果:
(1)概述
其作用:使用一个名为id的pyspark.sql.types.LongType列创建一个DataFrame,该列包含从开始到结束(独占)范围内的元素,并带有步长值step。
SparkSession.range(start, end=None, step=1, numPartitions=None)
参数:
start: int,起始值
end:int(可选),最终值(独占)
step:int(可选),增量步骤(默认为1)
numPartitions:int(可选),DataFrame的分区数
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.range(1,6,2).collect()
运行结果:
如果只指定了一个参数,它将被用作结束值。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.range(4).collect()
运行结果:
(1)概述
其作用:将指定的表作为DataFrame
SparkSession.table(tableName)
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df.createOrReplaceTempView('t1')
df2 = spark.table('t1')
sorted(df.collect())==sorted(df2.collect())
运行结果:
RuntimeConfig(jconf):面向用户的配置 API,可通过SparkSession.conf访问。
class pyspark.sql.conf.RuntimeConfig(jconf)
方法 | 概述 |
---|---|
get(key[, default]) | 返回给定键的 Spark 运行时配置属性的值,假设它已设置。 |
isModifiable(key) | 指示具有给定密钥的配置属性在当前会话中是否可以修改。 |
set(key, value) | 设置给定的Spark运行时配置属性。 |
unset(key) | 重置给定密钥的配置属性。 |
方法 | 概述 |
---|---|
DataFrameReader.csv(path[, schema, sep, …]) | 加载一个CSV文件并以DataFrame的形式返回结果。 |
DataFrameReader.format(source) | 指定输入数据源格式。 |
DataFrameReader.jdbc(url, table[, column, …]) | 构造一个 DataFrame,它表示通过 JDBC URL 和连接属性可访问的名为 table 的数据库表。 |
DataFrameReader.json(path[, schema, …]) | 加载JSON文件并以DataFrame的形式返回结果。 |
DataFrameReader.load([path, format, schema]) | 从数据源加载数据并将其作为DataFrame返回。 |
DataFrameReader.option(key, value) | 为基础数据源添加输入选项。 |
DataFrameReader.options(**options) | 为基础数据源添加输入选项 |
DataFrameReader.orc(path[, mergeSchema, …]) | 加载ORC文件,以DataFrame返回结果。 |
DataFrameReader.parquet(*paths, **options) | 加载Parquet文件,以DataFrame返回结果。 |
DataFrameReader.schema(schema) | 指定输入模式。 |
DataFrameReader.table(tableName) | 以DataFrame的形式返回指定的表。 |
DataFrameWriter.bucketBy(numBuckets, col, *cols) | 通过给定的列将输出存储起来。如果指定了该参数,输出将被放置在文件系统上,类似于Hive的桶式方案。 |
DataFrameWriter.csv(path[, mode, …]) | 将DataFrame的内容以CSV格式保存在指定的路径下。 |
DataFrameWriter.format(source) | 指定底层输出数据源。 |
DataFrameWriter.insertInto(tableName[, …]) | 将DataFrame的内容插入到指定的表。 |
DataFrameWriter.jdbc(url, table[, mode, …]) | 通过JDBC将DataFrame的内容保存到外部数据库表中。 |
DataFrameWriter.json(path[, mode, …]) | 将DataFrame的内容以JSON格式(JSON Lines文本格式或换行分隔的JSON)保存在指定的路径上。 |
DataFrameWriter.mode(saveMode) | 指定数据或表已经存在时的行为。 |
DataFrameWriter.option(key, value) | 为基础数据源添加输出选项。 |
DataFrameWriter.options(**options) | 为基础数据源添加输出选项。 |
DataFrameWriter.orc(path[, mode, …]) | 将DataFrame的内容以ORC格式保存在指定的路径下。 |
DataFrameWriter.parquet(path[, mode, …]) | 将DataFrame的内容以Parquet格式保存在指定的路径上。 |
DataFrameWriter.partitionBy(*cols) | 根据文件系统上的给定列对输出进行分区。 |
DataFrameWriter.save([path, format, mode, …]) | 将DataFrame的内容保存到数据源。 |
DataFrameWriter.saveAsTable(name[, format, …]) | 将DataFrame的内容保存为指定的表。 |
DataFrameWriter.sortBy(col, *cols) | 按文件系统上的给定列对每个bucket中的输出进行排序。 |
DataFrameWriter.text(path[, compression, …]) | 将DataFrame的内容保存在指定路径的文本文件中。 |
(1)概述
其作用:加载一个CSV文件并以DataFrame的形式返回结果。
DataFrameReader.csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None)
如果启用了inferSchema,该函数将遍历输入一次,以确定输入模式。要避免一次遍历整个数据,请禁用inferSchema选项或使用schema显式指定模式。
参数:
path:str or list
字符串或字符串列表,用于输入路径或存储CSV行字符串的RDD。
其他参数均可选
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('sql_int_out_1.csv')
df.dtypes
运行结果:
(1)概述
其作用:指定输入数据源格式。
DataFrameReader.format(source)
参数:
source:str
字符串,数据源的名称,例如’ json ', ’ parquet '。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('json').load('sql_int_out_2.json')
df.show()
运行结果:
(1)概述
其作用:加载JSON文件并以DataFrame的形式返回结果。
DataFrameReader.json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None, dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None, recursiveFileLookup=None, allowNonNumericNumbers=None, modifiedBefore=None, modifiedAfter=None)
默认情况下支持JSON行(换行符分隔的JSON)。对于JSON(每个文件一条记录),将multiLine参数设置为true。
如果未指定schema参数,则此函数将遍历输入一次以确定输入模式。
参数:
path:str,list or RDD
字符串表示 JSON 数据集的路径,或路径列表或存储 JSON 对象的字符串 RDD。
其他参数均可选
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.json('sql_int_out_2.json')
df1.show()
运行结果:
如果格式化json,就会发现可能会报错,解决方法如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.option('multiLine','True').json('sql_int_out_2.json')
df1.show()
运行结果:
(1)概述
其作用:从数据源加载数据并将其作为DataFrame返回。
DataFrameReader.load(path=None, format=None, schema=None, **options)
参数:
path:str or list (可选)
可选字符串或文件系统支持的数据源的字符串列表。
format:str (可选)
数据源格式的可选字符串。默认为parquet
schema:pyspark.sql.types.StructType or str(可选)
可选的 pyspark.sql.types. StructType 用于输入模式或 ddl 格式的字符串(例如 col0 INT、 col1 DOUBLE)。
**options:dict
所有其他字符串选项
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('json').load(['sql_int_out_2.json',
'sql_int_out_3.json'])
df.show()
运行结果:
(1)概述
其作用:加载ORC文件,以DataFrame返回结果。
DataFrameReader.orc(path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None)
参数:
path:str or list
其他参数可选
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.orc('xxx.orc')
df.show()
(1)概述
其作用:加载parquet文件,以DataFrame返回结果。
DataFrameReader.parquet(*paths, **options)
参数:
path:str
其他参数可选
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('xxx.parquet')
df.show()
(1)概述
其作用:以DataFrame的形式返回指定的表。
DataFrameReader.table(tableName)
参数:
tableName:str
字符串,表的名称。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('xxx.parquet')
df.createOrReplaceTempView('table_name')
spark.read.table('table_name').show()
(1)概述
其作用:通过给定的列将输出存储起来。如果指定了该参数,输出将被放置在文件系统上,类似于Hive的bucketing方案。
DataFrameWriter.bucketBy(numBuckets, col, *cols)
参数:
numBuckets:int
要保存的Bucket的数量
col:str,list or tuple
列的名称或名称列表
cols:str
额外的名称(可选)。如果col是一个列表,那么它应该为空。
其适用于与DataFrameWriter.saveAsTable()结合使用的基于文件的数据源。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df.write.format('parquet').bucketBy(100,'name','age').mode('overwrite').saveAsTable('bucketed_table')
(1)概述
其作用:将DataFrame的内容以CSV格式保存在指定的路径下。
DataFrameWriter.csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None)
参数:
path:str
任何Hadoop支持的文件系统中的路径
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.option('multiLine','True').json('sql_int_out_2.json')
df1.show()
df1.write.csv('data')
运行结果:
(1)概述
其作用:指定底层输出数据源。
DataFrameWriter.format(source)
参数:
source:str
字符串,数据源的名称,例如’ json ', ’ parquet '。
(2)实例
df.write.format('json').save('data')
(1)概述
其作用:将DataFrame的内容以JSON格式(JSON Lines文本格式或换行分隔的JSON)保存在指定的路径上。
DataFrameWriter.json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None, lineSep=None, encoding=None, ignoreNullFields=None)
参数:
path:str
任何Hadoop支持的文件系统中的路径
mode:str(可选)
指定数据已经存在时保存操作的行为。
append:将该DataFrame的内容附加到现有数据。
overwrite:覆盖现有的数据。
ignore:如果数据已经存在,则静默地忽略此操作。
error or errorifexists(默认情况下):如果数据已经存在,则抛出异常。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df3 = spark.read.csv('sql_int_out_1.csv')
df = df3.toDF('name','age')
df.show()
df.write.json('data1')
运行结果:
(1)概述
其作用:指定数据或表已存在时的行为。
DataFrameWriter.mode(saveMode)
选项:
append:将该DataFrame的内容附加到现有数据。
overwrite:覆盖现有的数据。
ignore:如果数据已经存在,则静默地忽略此操作。
error or errorifexists(默认情况下):如果数据已经存在,则抛出异常。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df3 = spark.read.csv('sql_int_out_1.csv')
df = df3.toDF('name','age')
df.show()
df.write.mode('append').json('data1')
运行结果:
(1)概述
DataFrameWriter.orc(path, mode=None, partitionBy=None, compression=None)
其作用:将DataFrame的内容以ORC格式保存在指定的路径下。
DataFrameWriter.parquet(path, mode=None, partitionBy=None, compression=None)
其作用:将DataFrame的内容以Parquet格式保存在指定的路径上。
(2)实例
df.write.orc('data')
df.write.parquet('data')
(1)概述
其作用:根据文件系统上的给定列对输出进行分区。
DataFrameWriter.partitionBy(*cols)
参数:
cols:str or list
列名
如果指定了该参数,输出将在文件系统上进行布局,类似于Hive的分区方案。
(2)实例
df.write.partitionBy('name', 'age').parquet('data')
(1)概述
其作用:将DataFrame的内容保存到数据源。
DataFrameWriter.save(path=None, format=None, mode=None, partitionBy=None, **options)
数据源由格式和一组选项指定。如果不指定format,则使用spark.sql.sources.default配置的默认数据源。
(2)实例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df3 = spark.read.csv('sql_int_out_1.csv')
df = df3.toDF('name','age')
df.show()
df.write.mode('append').save('data2')
运行结果:
(1)概述
其作用:按文件系统上的给定列对每个bucket中的输出进行排序。
DataFrameWriter.sortBy(col, *cols)
参数:
col:str,tuple or list
列的名称或名称列表。
(2)实例
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
df3 = spark.read.csv('sql_int_out_1.csv')
df = df3.toDF('name','age') # 将数据转化为DataFrame类型并创建name,age列
df = df.select(df.name,df['age'].cast(IntegerType())) #将age列类型转化为int类型
df.show()
(df.write.format('parquet') # 生成parquet文件
.bucketBy(1,'name','age') # 创建一个bucket容器,按给定列存储输出
.sortBy('age') # 根据age进行排序
.mode("overwrite") # 文件存在时重写
.saveAsTable('sorted_bucketed_table')) # 保存
运行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。