当前位置:   article > 正文

Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序_data analysis with python and pyspark

data analysis with python and pyspark

命令行输入:pyspark

设置pyspark 启动时所使用的python版本参考:Data.Analysis.with.Python.and.PySpark:准备_lm19770429的专栏-CSDN博客The book focuses on Spark version 3.2How PySpark worksUnder the hood, it looks more like what’s on the right: you have some workbenches that some workers are assigned to. The workbenches are like the computers in our Spark cluster: there is a fixed ahttps://blog.csdn.net/lm19770429/article/details/123357275?spm=1001.2014.3001.5501

 REPL:交互式解释器

The SparkSession entry point

通过使用getOrCreate()方法,您的程序将在交互式和批处理模式下工作,避免创建新的SparkSession(如果已经存在)。

 配置日志级别:

spark.sparkContext.setLogLevel("KEYWORD")

Keyword Signification
OFF No logging at all (not recommended).
FATAL Only fatal errors. A fatal error will crash your Spark cluster.
ERROR Will show FATAL, as well as other recoverable errors.
WARN Add warnings (and there are quite a lot of them).
INFO Will give you runtime information, such as repartitioning and data recovery

DEBUG Will provide debug information on your jobs.
TRACE Will trace your jobs (more verbose debug logs). Can be quite informative but very annoying.
ALL Everything that PySpark can spit, it will spit. As useful as OFF.

Reading data into a data frame with spark.read

 

PySpark reads your data
PySpark can accommodate the different ways you can process data. Under the hood,
spark.read.csv() will map to spark.read.format('csv').load(), and you may
encounter this form in the wild. I usually prefer using the direct csv method as it provides a handy reminder of the different parameters the reader can take.
orc and parquet are also data formats that are especially well suited for big data
processing. ORC (which stands for “optimized row columnar”) and Parquet are competing data formats that pretty much serve the same purpose. Both are open sourced
and now part of the Apache project, just like Spark.
PySpark defaults to using Parquet when reading and writing files, and we’ll use this
format to store our results throughout the book. I’ll provide a longer discussion about
the usage, advantages, and trade-offs of using Parquet or ORC as a data format in
chapter 6

读取txt:

  1. >>> book=spark.read.text("./data/gutenberg_books/1342-0.txt")
  2. >>> book
  3. DataFrame[value: string]

您可能希望更清楚地显示模式。PySpark提供printSchema()以树形形式显示模式。

当我不记得要应用的确切方法时,我非常喜欢在对象上使用dir()

  1. dir(spark)
  2. ['Builder', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_activeSession', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_dataframe', '_create_from_pandas_with_arrow', '_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'getActiveSession', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

如果您不确定函数、类或方法的正确用法,可以打印_doc__;属性,或者,对于使用IPython的用户,可以使用后面的问号(如果需要更多详细信息,可以使用两个问号)。

 print(spark.__doc__)

从结构到内容:用show()探索我们的数据框架

默认情况下,它将显示20行并截断长值。

 book.show()

show()方法有三个可选参数:
 n可以设置为任何正整数,并将显示该行数。
 truncate如果设置为true,将截断列以仅显示20个字符。如果设置为False,它将显示整个长度,或任何正整数以截断为特定数量的字符。
 vertical接受一个布尔值,当设置为True时,将把每条记录显示为一个小表。如果您需要详细检查记录,这是一个非常有用的选项。

book.show(10, truncate=50)

可选主题:非惰性Spark

也就是说,有些时候,尤其是在学习时,您希望在每次转换后对数据帧进行评估(我们称之为急切评估)。

如果要在shell中使用即时求值,可以在shell中粘贴以下代码:

  1. from pyspark.sql import SparkSession
  2. spark = (SparkSession.builder
  3. .config("spark.sql.repl.eagerEval.enabled", "True")
  4. .getOrCreate())

简单的列转换:从一个句子移动到一个单词列表

  1. from pyspark.sql.functions import split
  2. lines = book.select(split(book.value, " ").alias("line"))
  3. lines.show(5)

Selecting specific columns using select()

最基本的转变是特征,在那里你可以准确地返回提供给你的东西。如果您过去使用过SQL,您可能会认为这听起来像一个SELECT语句

book.select(book.value)

PySpark为其数据帧中的每一列提供了一个指向该列的点符号。

PySpark提供了多种选择列的方法。我将在下一个列表中显示四种最常见的方法。

  1. from pyspark.sql.functions import col
  2. book.select(book.value)
  3. book.select(book["value"])
  4. book.select(col("value"))
  5. book.select("value")

 Transforming columns: Splitting a string into a list of words

 split函数将字符串列转换为数组列,其中包含一个或多个字符串元素。

重命名列:alias和WithColumnRename

当使用指定要显示哪些列的方法时,如select()方法,请使用alias()。

如果只想重命名列而不更改数据框的其余部分,请使用WithColumnRename。重新命名。请注意,如果该列不存在,PySpark会将此方法视为无操作,不会执行任何操作。

  1. # This looks a lot cleaner
  2. lines = book.select(split(book.value, " ").alias("line"))
  3. # This is messier, and you have to remember the name PySpark assigns
  4. automatically
  5. lines = book.select(split(book.value, " "))
  6. lines = lines.withColumnRenamed("split(value, , -1)", "line")

重塑数据:将列表分解成行

  1. from pyspark.sql.functions import explode, col
  2. words = lines.select(explode(col("line")).alias("word"))
  3. words.show(15)
  4. # +----------+
  5. # | word|
  6. # +----------+
  7. # | The|
  8. # | Project|
  9. # | Gutenberg|
  10. # | EBook|
  11. # | of|
  12. # | Pride|
  13. # | and|
  14. # |Prejudice,|
  15. # | by|
  16. # | Jane|
  17. # | Austen|
  18. # | |
  19. # | This|
  20. # | eBook|
  21. # | is|
  22. # +----------+
  23. # only showing top 15 rows

Working with words: Changing case and removing punctuation

Lower the case of the words in the data frame

  1. from pyspark.sql.functions import lower
  2. words_lower = words.select(lower(col("word")).alias("word_lower"))
  3. words_lower.show()

clean our words of any punctuation and other non-useful characters

我们只保留使用正则表达式的字母

Using regexp_extract to keep what looks like a word

  1. from pyspark.sql.functions import regexp_extract
  2. words_clean = words_lower.select(
  3. regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word")
  4. )
  5. words_clean.show()

 If you are interested in building your own, the RegExr (https://regexr.com/) website
is really useful, as well as the Regular Expression Cookbook by Steven Levithan and
Jan Goyvaerts (O’Reilly, 2012).

Filtering rows

PySpark provides not one, but two identical methods to perform this task. You can use either .filter() or its alias .where().

Filtering rows in your data frame using where or filter: using “not equal,” or !=

  1. words_nonull = words_clean.filter(col("word") != "")
  2. words_nonull.show()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/151959
推荐阅读
相关标签
  

闽ICP备14008679号