results = words_nonull.groupby(col("word"_jonathan rioux">
赞
踩
Counting word frequencies using groupby() and count()
- groups = words_nonull.groupby(col("word"))
- print(groups)
- # <pyspark.sql.group.GroupedData at 0x10ed23da0>
- results = words_nonull.groupby(col("word")).count()
- print(results)
- # DataFrame[word: string, count: bigint]
- results.show()
因为Spark是懒惰的,所以它不关心记录的顺序,除非我们明确要求它这样做。由于我们希望看到显示的顶部单词,让我们在数据框中进行一点排序,同时完成程序的最后一步:返回顶部单词频率。
PySpark为排序提供了两种不同的语法
Displaying the top 10 words in Jane Austen’s Pride and Prejudice
- results.orderBy("count", ascending=False).show(10)
- results.orderBy(col("count").desc()).show(10)
就像我们使用read()和SparkReader读取Spark中的数据一样,我们使用write()和SparkWriter对象将数据帧写回磁盘。我专门让SparkWriter将文本导出到CSV文件中,命名simple_count.csv
如果我们看一下结果,我们可以看到PySpark并没有创建一个结果。csv文件。相反,它创建了一个同名的目录,并将201个文件放入该目录中(200个CSV+1_SUCCESS file)。
默认情况下,PySpark将为每个分区提供一个文件。这意味着我们的程序在我的机器上运行时,最终会产生200个分区。为了减少分区的数量,我们将coalesce()方法应用于所需的分区数量。
下一个清单显示了在写入磁盘之前在数据帧上使用coalesce(1)时的区别。我们仍然有一个目录,但里面只有一个CSV文件。
results.coalesce(1).write.csv("./data/simple_count_single_partition.csv")
完整代码如下:
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import (
- col,
- explode,
- lower,
- regexp_extract,
- split,
- )
-
- spark = SparkSession.builder.appName(
- "Analyzing the vocabulary of Pride and Prejudice."
- ).getOrCreate()
-
- book = spark.read.text("./data/gutenberg_books/1342-0.txt")
-
- lines = book.select(split(book.value, " ").alias("line"))
-
- words = lines.select(explode(col("line")).alias("word"))
-
- words_lower = words.select(lower(col("word")).alias("word"))
-
- words_clean = words_lower.select(
- regexp_extract(col("word"), "[a-z']*", 0).alias("word")
- )
-
- words_nonull = words_clean.where(col("word") != "")
-
- results = words_nonull.groupby(col("word")).count()
-
- results.orderBy("count", ascending=False).show(10)
-
- results.coalesce(1).write.csv("./simple_count_single_partition.csv")
虽然没有硬性规定,但普遍的做法是使用F来表示PySpark的函数。
Since col, explode, lower, regexp_extract, and split are all in pyspark.sql.functions, we can import the whole module.
用链式变换方法去除中间变量
list 3.7
- # Before
- book = spark.read.text("./data/gutenberg_books/1342-0.txt")
- lines = book.select(split(book.value, " ").alias("line"))
- words = lines.select(explode(col("line")).alias("word"))
- words_lower = words.select(lower(col("word")).alias("word"))
- words_clean = words_lower.select(
- regexp_extract(col("word"), "[a-z']*", 0).alias("word")
- )
- words_nonull = words_clean.where(col("word") != "")
- results = words_nonull.groupby("word").count()
- # After
- import pyspark.sql.functions as F
- results = (
- spark.read.text("./data/gutenberg_books/1342-0.txt")
- .select(F.split(F.col("value"), " ").alias("line"))
- .select(F.explode(F.col("line")).alias("word"))
- .select(F.lower(F.col("word")).alias("word"))
- .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
- .where(F.col("word") != "")
- .groupby("word")
- .count()
- )
再举一个例子:
如果查看清单3.7中的“after”代码,您会注意到我在等号的右侧以一个圆括号开始(spark=([…])。这是我需要在Python中链接方法时使用的技巧。如果不将结果用括号括起来,则需要在每行末尾添加一个\字符,这会给程序增加视觉噪音。PySpark代码在使用方法链接时特别容易出现换行:如下
以批处理模式提交作业
$ spark-submit ./code/Ch03/word_count_submit.py
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。