赞
踩
dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。
- from pyspark import SparkContext, SparkConf, SQLContext
-
- from pyspark.sql import Row
-
-
-
- logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"
-
- conf = SparkConf().setMaster("local").setAppName("My App")
-
- sc = SparkContext(conf=conf)
-
- sqlContext = SQLContext(sc)
-
- dataA = sqlContext.read.csv("路径")
-
-
-
-
-
- dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]
-
- dataf = sqlContext.createDataFrame(dicts)
-
- dataf.show()
-
-
-
- dicts = [['a', 1], ['b', 2]]
-
- rdd = sc.parallelize(dicts)
-
- dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])
-
- dataf.show()
-
-
-
-
-
- rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
-
- dataf= sqlContext.createDataFrame(rows)
-
- dataf.show()
-
-
-
- dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')
可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。
读取数据:
- df = spark.read.json("data.json")
-
-
-
- df = spark.read.csv("data.csv", header=True, inferSchema=True)
-
-
-
- df = spark.read.parquet("data.parquet")
显示数据:
# 显示前 n 行数据,默认为 20 行
df.show(n=5)
# 打印 DataFrame 的 schema
df.printSchema()
选择和过滤数据:
# 选择特定列
selected_df = df.select("column1", "column2")
# 使用条件过滤数据
filtered_df = df.filter(df["age"] > 30)
聚合和分组数据:
- from pyspark import SparkContext, SparkConf, SQLContext
-
-
-
-
-
- conf = SparkConf().setMaster("local").setAppName("My App")
-
- sc = SparkContext(conf=conf)
-
- sqlContext = SQLContext(sc)
-
-
-
-
-
- dicts = [
-
- ['teacher', 202355, 16, '336051551@qq.com'],
-
- ['student', 2035, 16, '336051551@qq.com'],
-
- ['qa', 2355, 16, '336051551@qq.com'],
-
- ['qa', 20235, 16, '336051551@qq.com'],
-
- ['teacher', 35, 16, '336051asdf'],
-
- ['student', 453, 16, '336051asdf'],
-
-
-
-
-
- ]
-
- rdd = sc.parallelize(dicts, 3)
-
- data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
-
-
-
-
-
- result = data.groupBy("title").max("sales").alias("max_sales")
-
- resultA = data.groupBy("title").sum("sales").alias("sum_sales")
-
-
-
- # 显示结果
-
- result.show()
-
- resultA.show()
-
-
-
-
-
- +-------+----------+
-
- | title|max(sales)|
-
- +-------+----------+
-
- |teacher| 202355|
-
- | qa| 20235|
-
- |student| 2035|
-
- +-------+----------+
-
-
-
- +-------+----------+
-
- | title|sum(sales)|
-
- +-------+----------+
-
- |teacher| 202390|
-
- | qa| 22590|
-
- |student| 2488|
-
- +-------+----------+
-
-
-
- 数据排序:
-
-
-
- from pyspark.sql.functions import desc
-
-
-
- # 按列排序
-
- sorted_df = df.sort("column1")
-
-
-
- # 按列降序排序
-
- sorted_df = df.sort(desc("column1"))
-
- 添加,修改和删除列:
-
-
-
- from pyspark.sql.functions import upper
-
-
-
- # 添加新列
-
- new_df = df.withColumn("new_column", df["column1"] * 2)
-
-
-
- # 修改现有列
-
- modified_df = df.withColumn("column1", upper(df["column1"]))
-
-
-
- # 删除列
-
- dropped_df = df.drop("column1")
-
-
-
- 重命名列:
-
- # 重命名 DataFrame 中的列
-
- renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")
- from pyspark import SparkContext, SparkConf, SQLContext
-
-
-
- # 创建 SparkSession
-
- conf = SparkConf().setMaster("local").setAppName("My App")
-
- sc = SparkContext(conf=conf)
-
- sqlContext = SQLContext(sc)
-
-
-
-
-
- dicts = [
-
- ['teacher', 202355, 16, '336051551@qq.com'],
-
- ['student', 2035, 16, '336051551@qq.com'],
-
- ['qa', 2355, 16, '336051551@qq.com'],
-
- ['qa', 20235, 16, '336051551@qq.com'],
-
- ['teacher', 35, 16, '336051asdf'],
-
- ['student', 453, 16, '336051asdf'],
-
-
-
-
-
- ]
-
- rdd = sc.parallelize(dicts, 3)
-
- data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
-
-
-
- data.createOrReplaceTempView("table")
要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql
- query = "select * from table where title = 'qa'"
-
-
-
- resultB = sqlContext.sql(query)
-
-
-
- resultB.show()
-
-
-
- # 执行结果
-
- +-----+-----+---+----------------+
-
- |title|sales|age| email|
-
- +-----+-----+---+----------------+
-
- | qa| 2355| 16|336051551@qq.com|
-
- | qa|20235| 16|336051551@qq.com|
-
- +-----+-----+---+----------------+
- query = "select title, sum(sales), max(sales) from table group by title"
-
-
-
- resultC = sqlContext.sql(query)
-
-
-
- resultC.show()
-
-
-
- # 执行结果
-
- +-------+----------+----------+
-
- | title|sum(sales)|max(sales)|
-
- +-------+----------+----------+
-
- |teacher| 202390| 202355|
-
- | qa| 22590| 20235|
-
- |student| 2488| 2035|
-
- +-------+----------+----------+
Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。
回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。
- from pyspark import SparkContext, SparkConf, SQLContext
-
- from pyspark.sql import SparkSession
-
- import pyspark.sql.functions as F
-
-
-
- conf = SparkConf().setMaster("local").setAppName("My App")
-
- sc = SparkContext(conf=conf)
-
- sqlContext = SQLContext(sc)
-
-
-
- rdd = sc.parallelize(range(1000))
-
- print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())
-
-
-
- dicts = [
-
- ['frank', 202355, 16, '336051551@qq.com'],
-
- ['frank', 202355, 16, '336051551@qq.com'],
-
- ['frank', 202355, 16, '336051551@qq.com'],
-
- ['frank', 202355, 16, '336051551@qq.com'],
-
- ['frank', 202355, 16, '336051asdf'],
-
- ['', 452345, 16, '336051asdf'],
-
-
-
-
-
- ]
-
- rdd = sc.parallelize(dicts, 3)
-
- dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])
-
-
-
-
-
- # 验证 id 字段必须是整数
-
- id_filter = F.col("id").cast("int") >= 0
-
-
-
- # 验证 name 字段必须是非空字符串
-
- name_filter = F.col("name").isNotNull() & (F.col("name") != "")
-
-
-
- # 验证 age 字段必须是大于等于 0 的整数
-
- age_filter = F.col("age").cast("int") >= 0
-
-
-
- # 验证 email 字段必须是有效的电子邮件地址
-
- email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
-
-
-
- # 应用过滤条件
-
- valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)
-
-
-
- # 输出符合质量要求的数据
-
- valid_data.show()
-
-
-
- # 输出不符合质量要求的数据
-
- invalid_data = dataf.exceptAll(valid_data)
-
- invalid_data.show()
更多内容欢迎来到我的知识星球:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。