Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序_data analysis with python and pyspark

data analysis with python and pyspark


The SparkSession entry point




Keyword Signification
OFF No logging at all (not recommended).
FATAL Only fatal errors. A fatal error will crash your Spark cluster.
ERROR Will show FATAL, as well as other recoverable errors.
WARN Add warnings (and there are quite a lot of them).
INFO Will give you runtime information, such as repartitioning and data recovery

DEBUG Will provide debug information on your jobs.
TRACE Will trace your jobs (more verbose debug logs). Can be quite informative but very annoying.
ALL Everything that PySpark can spit, it will spit. As useful as OFF.

Reading data into a data frame with spark.read


PySpark reads your data
PySpark can accommodate the different ways you can process data. Under the hood,
spark.read.csv() will map to spark.read.format('csv').load(), and you may
encounter this form in the wild. I usually prefer using the direct csv method as it provides a handy reminder of the different parameters the reader can take.
orc and parquet are also data formats that are especially well suited for big data
processing. ORC (which stands for “optimized row columnar”) and Parquet are competing data formats that pretty much serve the same purpose. Both are open sourced
and now part of the Apache project, just like Spark.
PySpark defaults to using Parquet when reading and writing files, and we’ll use this
format to store our results throughout the book. I’ll provide a longer discussion about
the usage, advantages, and trade-offs of using Parquet or ORC as a data format in
chapter 6


  1. >>> book=spark.read.text("./data/gutenberg_books/1342-0.txt")
  2. >>> book
  3. DataFrame[value: string]



  1. dir(spark)
  2. ['Builder', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_activeSession', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_dataframe', '_create_from_pandas_with_arrow', '_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'getActiveSession', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']






 n可以设置为任何正整数,并将显示该行数。
 truncate如果设置为true,将截断列以仅显示20个字符。如果设置为False,它将显示整个长度,或任何正整数以截断为特定数量的字符。
 vertical接受一个布尔值,当设置为True时,将把每条记录显示为一个小表。如果您需要详细检查记录,这是一个非常有用的选项。

book.show(10, truncate=50)




  1. from pyspark.sql import SparkSession
  2. spark = (SparkSession.builder
  3. .config("spark.sql.repl.eagerEval.enabled", "True")
  4. .getOrCreate())


  1. from pyspark.sql.functions import split
  2. lines = book.select(split(book.value, " ").alias("line"))
  3. lines.show(5)

Selecting specific columns using select()





  1. from pyspark.sql.functions import col
  2. book.select(book.value)
  3. book.select(book["value"])
  4. book.select(col("value"))
  5. book.select("value")

 Transforming columns: Splitting a string into a list of words





  1. # This looks a lot cleaner
  2. lines = book.select(split(book.value, " ").alias("line"))
  3. # This is messier, and you have to remember the name PySpark assigns
  4. automatically
  5. lines = book.select(split(book.value, " "))
  6. lines = lines.withColumnRenamed("split(value, , -1)", "line")


  1. from pyspark.sql.functions import explode, col
  2. words = lines.select(explode(col("line")).alias("word"))
  3. words.show(15)
  4. # +----------+
  5. # | word|
  6. # +----------+
  7. # | The|
  8. # | Project|
  9. # | Gutenberg|
  10. # | EBook|
  11. # | of|
  12. # | Pride|
  13. # | and|
  14. # |Prejudice,|
  15. # | by|
  16. # | Jane|
  17. # | Austen|
  18. # | |
  19. # | This|
  20. # | eBook|
  21. # | is|
  22. # +----------+
  23. # only showing top 15 rows

Working with words: Changing case and removing punctuation

Lower the case of the words in the data frame

  1. from pyspark.sql.functions import lower
  2. words_lower = words.select(lower(col("word")).alias("word_lower"))
  3. words_lower.show()

clean our words of any punctuation and other non-useful characters


Using regexp_extract to keep what looks like a word

  1. from pyspark.sql.functions import regexp_extract
  2. words_clean = words_lower.select(
  3. regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word")
  4. )
  5. words_clean.show()

 If you are interested in building your own, the RegExr (https://regexr.com/) website
is really useful, as well as the Regular Expression Cookbook by Steven Levithan and
Jan Goyvaerts (O’Reilly, 2012).

Filtering rows

PySpark provides not one, but two identical methods to perform this task. You can use either .filter() or its alias .where().

Filtering rows in your data frame using where or filter: using “not equal,” or !=

  1. words_nonull = words_clean.filter(col("word") != "")
  2. words_nonull.show()

