赞
踩
命令行输入:pyspark
通过使用getOrCreate()方法,您的程序将在交互式和批处理模式下工作,避免创建新的SparkSession(如果已经存在)。
配置日志级别:
spark.sparkContext.setLogLevel("KEYWORD")
Keyword Signification
OFF No logging at all (not recommended).
FATAL Only fatal errors. A fatal error will crash your Spark cluster.
ERROR Will show FATAL, as well as other recoverable errors.
WARN Add warnings (and there are quite a lot of them).
INFO Will give you runtime information, such as repartitioning and data recovery
DEBUG Will provide debug information on your jobs.
TRACE Will trace your jobs (more verbose debug logs). Can be quite informative but very annoying.
ALL Everything that PySpark can spit, it will spit. As useful as OFF.
PySpark reads your data
PySpark can accommodate the different ways you can process data. Under the hood,
spark.read.csv() will map to spark.read.format('csv').load(), and you may
encounter this form in the wild. I usually prefer using the direct csv method as it provides a handy reminder of the different parameters the reader can take.
orc and parquet are also data formats that are especially well suited for big data
processing. ORC (which stands for “optimized row columnar”) and Parquet are competing data formats that pretty much serve the same purpose. Both are open sourced
and now part of the Apache project, just like Spark.
PySpark defaults to using Parquet when reading and writing files, and we’ll use this
format to store our results throughout the book. I’ll provide a longer discussion about
the usage, advantages, and trade-offs of using Parquet or ORC as a data format in
chapter 6
读取txt:
- >>> book=spark.read.text("./data/gutenberg_books/1342-0.txt")
- >>> book
- DataFrame[value: string]
您可能希望更清楚地显示模式。PySpark提供printSchema()以树形形式显示模式。
当我不记得要应用的确切方法时,我非常喜欢在对象上使用dir()
- dir(spark)
- ['Builder', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_activeSession', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_dataframe', '_create_from_pandas_with_arrow', '_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'getActiveSession', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']
如果您不确定函数、类或方法的正确用法,可以打印_doc__;属性,或者,对于使用IPython的用户,可以使用后面的问号(如果需要更多详细信息,可以使用两个问号)。
print(spark.__doc__)
从结构到内容:用show()探索我们的数据框架
默认情况下,它将显示20行并截断长值。
book.show()
show()方法有三个可选参数:
n可以设置为任何正整数,并将显示该行数。
truncate如果设置为true,将截断列以仅显示20个字符。如果设置为False,它将显示整个长度,或任何正整数以截断为特定数量的字符。
vertical接受一个布尔值,当设置为True时,将把每条记录显示为一个小表。如果您需要详细检查记录,这是一个非常有用的选项。
book.show(10, truncate=50)
可选主题:非惰性Spark
也就是说,有些时候,尤其是在学习时,您希望在每次转换后对数据帧进行评估(我们称之为急切评估)。
如果要在shell中使用即时求值,可以在shell中粘贴以下代码:
- from pyspark.sql import SparkSession
- spark = (SparkSession.builder
- .config("spark.sql.repl.eagerEval.enabled", "True")
- .getOrCreate())
- from pyspark.sql.functions import split
- lines = book.select(split(book.value, " ").alias("line"))
- lines.show(5)
最基本的转变是特征,在那里你可以准确地返回提供给你的东西。如果您过去使用过SQL,您可能会认为这听起来像一个SELECT语句
book.select(book.value)
PySpark为其数据帧中的每一列提供了一个指向该列的点符号。
PySpark提供了多种选择列的方法。我将在下一个列表中显示四种最常见的方法。
- from pyspark.sql.functions import col
- book.select(book.value)
- book.select(book["value"])
- book.select(col("value"))
- book.select("value")
split函数将字符串列转换为数组列,其中包含一个或多个字符串元素。
当使用指定要显示哪些列的方法时,如select()方法,请使用alias()。
如果只想重命名列而不更改数据框的其余部分,请使用WithColumnRename。重新命名。请注意,如果该列不存在,PySpark会将此方法视为无操作,不会执行任何操作。
- # This looks a lot cleaner
- lines = book.select(split(book.value, " ").alias("line"))
- # This is messier, and you have to remember the name PySpark assigns
- automatically
- lines = book.select(split(book.value, " "))
- lines = lines.withColumnRenamed("split(value, , -1)", "line")
- from pyspark.sql.functions import explode, col
- words = lines.select(explode(col("line")).alias("word"))
- words.show(15)
- # +----------+
- # | word|
- # +----------+
- # | The|
- # | Project|
- # | Gutenberg|
- # | EBook|
- # | of|
- # | Pride|
- # | and|
- # |Prejudice,|
- # | by|
- # | Jane|
- # | Austen|
- # | |
- # | This|
- # | eBook|
- # | is|
- # +----------+
- # only showing top 15 rows
Lower the case of the words in the data frame
- from pyspark.sql.functions import lower
- words_lower = words.select(lower(col("word")).alias("word_lower"))
- words_lower.show()
clean our words of any punctuation and other non-useful characters
我们只保留使用正则表达式的字母
Using regexp_extract to keep what looks like a word
- from pyspark.sql.functions import regexp_extract
- words_clean = words_lower.select(
- regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word")
- )
-
- words_clean.show()
If you are interested in building your own, the RegExr (https://regexr.com/) website
is really useful, as well as the Regular Expression Cookbook by Steven Levithan and
Jan Goyvaerts (O’Reilly, 2012).
PySpark provides not one, but two identical methods to perform this task. You can use either .filter() or its alias .where().
Filtering rows in your data frame using where or filter: using “not equal,” or !=
- words_nonull = words_clean.filter(col("word") != "")
- words_nonull.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。