赞
踩
Spark SQL是Spark中用于处理结构化数据的一个模块,前身是Shark,但本身继承了前身Hive兼容和内存列存储的一些优点。Spark SQL具有以下四个特点:
Spark中DataFrame是⼀个分布式的⾏集合,可以想象为⼀个关系型数据库的表,或者⼀个带有列名的Excel表格。不过它跟RDD有以下共同之处:
DataFrame | RDD | |
---|---|---|
逻辑框架 | 提供详细结构信息,例如列的名称和类型 | 不知道类的内部结构 |
数据操作 | API更丰富、效率更高 | 代码少时,速度更快 |
DataFrame的API也分为transformation和action两类
SparkSession.builder.master("local") \
... appName("Word Count") \
... getOrCreate()
# Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置
# master (master)设置要链接到的spark master节点地址, 传⼊ “local” 代表本地模式, “local[4]”代表本地模式4内核运⾏
# appName (name)为Spark应⽤设置名字
# getOrCreate ()获取⼀个已经存在的 SparkSession 或者如果没有已经存在的, 创建⼀个新的SparkSession
sparkSession.createDataFrame
# json格式
spark.read.json("xxx.json")
spark.read.format('json').load('xxx.json')
# parquet格式
spark.read.parquet("xxx.parquet")
# jdbc格式
spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name")\
.option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
# rdd中读取数据
spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
sc = spark.sparkContext
list1 = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(list1)
# 添加数据列名
people = rdd.map(lambda x:Row(name=x[0], age=int(x[1])))
# 创建DataFrame
df_pp = spark.createDataFrame(people)
print(df_pp.show(2))
# # rdd中读取数据
spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
df = spark.read.format('csv').option('header','true').load('iris.csv')
df.printSchema()
df.show(5)
print(df.count())
print(df.columns)
# 增加列
df.withColumn('newWidth', df.SepalWidth*2).show()
# 删除列
df.drop('cls').show()
# 提取部分列
df.select('SepalLength','SepalWidth').show()
#统计信息 describe
df.describe().show()
#计算某⼀列的描述信息
df.describe('cls').show()
# 基本统计信息
df.select('cls').distinct().count()
# 分组统计
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()
# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样⽐例
#seed:随机种⼦
sdf = df.sample(False,0.2,100)
#设置数据⽐例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])
#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()
# 交叉表 crosstab df.crosstab('cls','SepalLength').show() # 自定义函数UDF # 找到数据,做后续处理 traindf, testdf = df.randomSplit([0.7,0.3]) diff_in_train_test = testdf.select('cls').subtract(traindf.select('cls')).distinct().show() # 找到类,整理到一个列表中 not_exist_cls = traindf.select('cls').subtract(testdf.select('cls')).distinct().rdd.map(lambda x:x[0]).collect() # 定义一个方法 def shou_remove(x): if x in not_exist_cls: return -1 else: return x # 在RDD中可以直接定义函数,交给rdd的transformatioins⽅法进⾏执⾏ # 在DataFrame中需要通过udf将⾃定义函数封装成udf函数再交给DataFrame进⾏调⽤执⾏ from pyspark.sql.types import StringType from pyspark.sql.functions import udf check = udf(shou_remove, StringType()) resultdf = traindf.withColumn('new_cls',check(traindf['cls'])).filter('new_cls<>-1') resultdf.show()
1)以通过反射⾃动推断DataFrame的Schema
# 1) json→RDD→DataFrame spark = SparkSession.builder.appName('json_demo').getOrCreate() sc = spark.sparkContext jsonString = [ """{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""", """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }""" ] jsonrdd = sc.parallelize(jsonString) # json 2 rdd jsondf = spark.read.json(jsonrdd) #rdd 2 dataframe jsondf.printSchema() jsondf.show() # 2) 直接从文件中加载 spark = SparkSession.builder.appName('json_demo').getOrCreate() sc = spark.sparkContext jsondf = spark.read.json('zips.json') jsondf.printSchema() jsondf.filter(jsondf.pop>40000).show(10) jsondf.createOrReplaceTempView('temp_table') resulfdf = spark.sql('select * from temp_table where pop>40000') resulfdf.show(10)
2)通过StructType对象指定Schema
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext
jsonSchema = StructType([
StructField("id", StringType(), True),
StructField("city", StringType(), True),
StructField("loc" , ArrayType(DoubleType())),
StructField("pop", LongType(), True),
StructField("state", StringType(), True)
])
reader = spark.read.schema(jsonSchema)
jsondf = reader.json('zips.json')
jsondf.printSchema()
jsondf.show(2)
jsondf.filter(jsondf.pop>40000).show(10)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。