当前位置:   article > 正文

从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(二)

从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(二)

Dataframe

dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。

初始化dataframe的方法

  1. from pyspark import SparkContext, SparkConf, SQLContext
  2. from pyspark.sql import Row
  3. logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"
  4. conf = SparkConf().setMaster("local").setAppName("My App")
  5. sc = SparkContext(conf=conf)
  6. sqlContext = SQLContext(sc)
  7. dataA = sqlContext.read.csv("路径")
  8. dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]
  9. dataf = sqlContext.createDataFrame(dicts)
  10. dataf.show()
  11. dicts = [['a', 1], ['b', 2]]
  12. rdd = sc.parallelize(dicts)
  13. dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])
  14. dataf.show()
  15. rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
  16. dataf= sqlContext.createDataFrame(rows)
  17. dataf.show()
  18. dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')

可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。

dataframe常用操作

读取数据:

  1. df = spark.read.json("data.json")
  2. df = spark.read.csv("data.csv", header=True, inferSchema=True)
  3. df = spark.read.parquet("data.parquet")

显示数据:

# 显示前 n 行数据,默认为 20 行

df.show(n=5)

# 打印 DataFrame 的 schema

df.printSchema()

选择和过滤数据:

# 选择特定列

selected_df = df.select("column1", "column2")

# 使用条件过滤数据

filtered_df = df.filter(df["age"] > 30)

聚合和分组数据:

  1. from pyspark import SparkContext, SparkConf, SQLContext
  2. conf = SparkConf().setMaster("local").setAppName("My App")
  3. sc = SparkContext(conf=conf)
  4. sqlContext = SQLContext(sc)
  5. dicts = [
  6. ['teacher', 202355, 16, '336051551@qq.com'],
  7. ['student', 2035, 16, '336051551@qq.com'],
  8. ['qa', 2355, 16, '336051551@qq.com'],
  9. ['qa', 20235, 16, '336051551@qq.com'],
  10. ['teacher', 35, 16, '336051asdf'],
  11. ['student', 453, 16, '336051asdf'],
  12. ]
  13. rdd = sc.parallelize(dicts, 3)
  14. data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
  15. result = data.groupBy("title").max("sales").alias("max_sales")
  16. resultA = data.groupBy("title").sum("sales").alias("sum_sales")
  17. # 显示结果
  18. result.show()
  19. resultA.show()
  20. +-------+----------+
  21. | title|max(sales)|
  22. +-------+----------+
  23. |teacher| 202355|
  24. | qa| 20235|
  25. |student| 2035|
  26. +-------+----------+
  27. +-------+----------+
  28. | title|sum(sales)|
  29. +-------+----------+
  30. |teacher| 202390|
  31. | qa| 22590|
  32. |student| 2488|
  33. +-------+----------+
  34. 数据排序:
  35. from pyspark.sql.functions import desc
  36. # 按列排序
  37. sorted_df = df.sort("column1")
  38. # 按列降序排序
  39. sorted_df = df.sort(desc("column1"))
  40. 添加,修改和删除列:
  41. from pyspark.sql.functions import upper
  42. # 添加新列
  43. new_df = df.withColumn("new_column", df["column1"] * 2)
  44. # 修改现有列
  45. modified_df = df.withColumn("column1", upper(df["column1"]))
  46. # 删除列
  47. dropped_df = df.drop("column1")
  48. 重命名列:
  49. # 重命名 DataFrame 中的列
  50. renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")

spark sql

初始化

  1. from pyspark import SparkContext, SparkConf, SQLContext
  2. # 创建 SparkSession
  3. conf = SparkConf().setMaster("local").setAppName("My App")
  4. sc = SparkContext(conf=conf)
  5. sqlContext = SQLContext(sc)
  6. dicts = [
  7. ['teacher', 202355, 16, '336051551@qq.com'],
  8. ['student', 2035, 16, '336051551@qq.com'],
  9. ['qa', 2355, 16, '336051551@qq.com'],
  10. ['qa', 20235, 16, '336051551@qq.com'],
  11. ['teacher', 35, 16, '336051asdf'],
  12. ['student', 453, 16, '336051asdf'],
  13. ]
  14. rdd = sc.parallelize(dicts, 3)
  15. data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
  16. data.createOrReplaceTempView("table")

要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql

简单的sql执行

  1. query = "select * from table where title = 'qa'"
  2. resultB = sqlContext.sql(query)
  3. resultB.show()
  4. # 执行结果
  5. +-----+-----+---+----------------+
  6. |title|sales|age| email|
  7. +-----+-----+---+----------------+
  8. | qa| 2355| 16|336051551@qq.com|
  9. | qa|20235| 16|336051551@qq.com|
  10. +-----+-----+---+----------------+

分组查询

  1. query = "select title, sum(sales), max(sales) from table group by title"
  2. resultC = sqlContext.sql(query)
  3. resultC.show()
  4. # 执行结果
  5. +-------+----------+----------+
  6. | title|sum(sales)|max(sales)|
  7. +-------+----------+----------+
  8. |teacher| 202390| 202355|
  9. | qa| 22590| 20235|
  10. |student| 2488| 2035|
  11. +-------+----------+----------+

Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。

数据测试/监控

回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。

  1. from pyspark import SparkContext, SparkConf, SQLContext
  2. from pyspark.sql import SparkSession
  3. import pyspark.sql.functions as F
  4. conf = SparkConf().setMaster("local").setAppName("My App")
  5. sc = SparkContext(conf=conf)
  6. sqlContext = SQLContext(sc)
  7. rdd = sc.parallelize(range(1000))
  8. print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())
  9. dicts = [
  10. ['frank', 202355, 16, '336051551@qq.com'],
  11. ['frank', 202355, 16, '336051551@qq.com'],
  12. ['frank', 202355, 16, '336051551@qq.com'],
  13. ['frank', 202355, 16, '336051551@qq.com'],
  14. ['frank', 202355, 16, '336051asdf'],
  15. ['', 452345, 16, '336051asdf'],
  16. ]
  17. rdd = sc.parallelize(dicts, 3)
  18. dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])
  19. # 验证 id 字段必须是整数
  20. id_filter = F.col("id").cast("int") >= 0
  21. # 验证 name 字段必须是非空字符串
  22. name_filter = F.col("name").isNotNull() & (F.col("name") != "")
  23. # 验证 age 字段必须是大于等于 0 的整数
  24. age_filter = F.col("age").cast("int") >= 0
  25. # 验证 email 字段必须是有效的电子邮件地址
  26. email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
  27. # 应用过滤条件
  28. valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)
  29. # 输出符合质量要求的数据
  30. valid_data.show()
  31. # 输出不符合质量要求的数据
  32. invalid_data = dataf.exceptAll(valid_data)
  33. invalid_data.show()

更多内容欢迎来到我的知识星球:
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/472382
推荐阅读
相关标签
  

闽ICP备14008679号