results = words_nonull.groupby(col("word"_jonathan rioux">
当前位置:   article > 正文

Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序(2):Submitting andscaling your firstPySpark prog_jonathan rioux, data analysis with python and pysp

jonathan rioux, data analysis with python and pyspark,课后题

Grouping records: Counting word frequencies

Counting word frequencies using groupby() and count()

  1. groups = words_nonull.groupby(col("word"))
  2. print(groups)
  3. # <pyspark.sql.group.GroupedData at 0x10ed23da0>
  4. results = words_nonull.groupby(col("word")).count()
  5. print(results)
  6. # DataFrame[word: string, count: bigint]
  7. results.show()

 

 因为Spark是懒惰的,所以它不关心记录的顺序,除非我们明确要求它这样做。由于我们希望看到显示的顶部单词,让我们在数据框中进行一点排序,同时完成程序的最后一步:返回顶部单词频率。

使用orderBy在屏幕上排序结果

PySpark为排序提供了两种不同的语法

Displaying the top 10 words in Jane Austen’s Pride and Prejudice

  1. results.orderBy("count", ascending=False).show(10)
  2. results.orderBy(col("count").desc()).show(10)

Writing data from a data frame 

就像我们使用read()和SparkReader读取Spark中的数据一样,我们使用write()和SparkWriter对象将数据帧写回磁盘。我专门让SparkWriter将文本导出到CSV文件中,命名simple_count.csv

如果我们看一下结果,我们可以看到PySpark并没有创建一个结果。csv文件。相反,它创建了一个同名的目录,并将201个文件放入该目录中(200个CSV+1_SUCCESS file)。

 默认情况下,PySpark将为每个分区提供一个文件。这意味着我们的程序在我的机器上运行时,最终会产生200个分区。为了减少分区的数量,我们将coalesce()方法应用于所需的分区数量。

下一个清单显示了在写入磁盘之前在数据帧上使用coalesce(1)时的区别。我们仍然有一个目录,但里面只有一个CSV文件。

results.coalesce(1).write.csv("./data/simple_count_single_partition.csv")

 完整代码如下:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import (
  3. col,
  4. explode,
  5. lower,
  6. regexp_extract,
  7. split,
  8. )
  9. spark = SparkSession.builder.appName(
  10. "Analyzing the vocabulary of Pride and Prejudice."
  11. ).getOrCreate()
  12. book = spark.read.text("./data/gutenberg_books/1342-0.txt")
  13. lines = book.select(split(book.value, " ").alias("line"))
  14. words = lines.select(explode(col("line")).alias("word"))
  15. words_lower = words.select(lower(col("word")).alias("word"))
  16. words_clean = words_lower.select(
  17. regexp_extract(col("word"), "[a-z']*", 0).alias("word")
  18. )
  19. words_nonull = words_clean.where(col("word") != "")
  20. results = words_nonull.groupby(col("word")).count()
  21. results.orderBy("count", ascending=False).show(10)
  22. results.coalesce(1).write.csv("./simple_count_single_partition.csv")

使用PySpark的导入约定简化依赖关系

虽然没有硬性规定,但普遍的做法是使用F来表示PySpark的函数。

Since col, explode, lower, regexp_extract, and split are all in pyspark.sql.functions, we can import the whole module.

通过方法链接简化我们的程序

用链式变换方法去除中间变量

list 3.7

  1. # Before
  2. book = spark.read.text("./data/gutenberg_books/1342-0.txt")
  3. lines = book.select(split(book.value, " ").alias("line"))
  4. words = lines.select(explode(col("line")).alias("word"))
  5. words_lower = words.select(lower(col("word")).alias("word"))
  6. words_clean = words_lower.select(
  7. regexp_extract(col("word"), "[a-z']*", 0).alias("word")
  8. )
  9. words_nonull = words_clean.where(col("word") != "")
  10. results = words_nonull.groupby("word").count()
  11. # After
  12. import pyspark.sql.functions as F
  13. results = (
  14. spark.read.text("./data/gutenberg_books/1342-0.txt")
  15. .select(F.split(F.col("value"), " ").alias("line"))
  16. .select(F.explode(F.col("line")).alias("word"))
  17. .select(F.lower(F.col("word")).alias("word"))
  18. .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
  19. .where(F.col("word") != "")
  20. .groupby("word")
  21. .count()
  22. )

再举一个例子:

如果查看清单3.7中的“after”代码,您会注意到我在等号的右侧以一个圆括号开始(spark=([…])。这是我需要在Python中链接方法时使用的技巧。如果不将结果用括号括起来,则需要在每行末尾添加一个\字符,这会给程序增加视觉噪音。PySpark代码在使用方法链接时特别容易出现换行:如下

使用spark submit以批处理模式启动程序

以批处理模式提交作业

$ spark-submit ./code/Ch03/word_count_submit.py

Scaling up our word frequency program

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/151957
推荐阅读
相关标签
  

闽ICP备14008679号