赞
踩
创建SparkSession对象以开始使用PySpark
- from pyspark.sql import SparkSession
- import pyspark.sql.functions as F
- spark = SparkSession.builder.getOrCreate()
- my_grocery_list = [
- ["Banana", 2, 1.74],
- ["Apple", 4, 2.04],
- ["Carrot", 1, 1.09],
- ["Cake", 1, 10.99],
- ]
- df_grocery_list = spark.createDataFrame(
- my_grocery_list, ["Item", "Quantity", "Price"]
- )
- df_grocery_list.printSchema()
我们的第一个参数是数据本身。您可以提供项目列表(这里是列表列表)、data frame或弹性分布式数据集;第二个参数是data frame的模式。同时,传递列名列表会推断出我们的列的类型(分别是string、long和double)。
主节点知道数据帧的结构,但实际数据在工作节点上表示。每一列都映射到存储在PySpark管理的集群中某个位置的数据。我们在抽象结构上操作,让主节点高效地委派工作。
名词解释:exploratory data analysis (or EDA)
PySpark不提供任何图表功能,也不使用其他图表库(如Matplotlib、seaborn、Altair或plot.ly)通常的解决方案是使用PySpark转换数据,使用toPandas()方法将PySpark数据框转换为pandas数据框,然后使用您最喜欢的图表库。
数据准备:
download the file on the Canada Open Data portal (http://mng.bz/y4YJ);
select the BroadcastLogs_2018_Q3_M8 file.
You also need to download the Data Dictionary in .doc form, as well as the
Reference Tables zip file, unzipping them into a ReferenceTables directory in data/
broadcast_logs.
make sure you have the following:
- logs.select("BroadcastLogID", "LogServiceID", "LogDate").show(5, False)
- # +--------------+------------+-------------------+
- # |BroadcastLogID|LogServiceID|LogDate |
- # +--------------+------------+-------------------+
- # |1196192316 |3157 |2018-08-01 00:00:00|
- # |1196192317 |3157 |2018-08-01 00:00:00|
- # |1196192318 |3157 |2018-08-01 00:00:00|
- # |1196192319 |3157 |2018-08-01 00:00:00|
- # |1196192320 |3157 |2018-08-01 00:00:00|
- # +--------------+------------+-------------------+
- # only showing top 5 rows
Four ways to select columns in PySpark, all equivalent in terms of results
- # Using the string to column conversion
- logs.select("BroadCastLogID", "LogServiceID", "LogDate")
- logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"])
- # Passing the column object explicitly
- logs.select(
- F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")
- )
- logs.select(
- *[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")]
- )
当显式选择几列时,不必将它们包装到列表中。如果已经在处理列列表,可以使用*前缀将其解压缩。
data frame在columns属性中跟踪其列;logs.columns是一个Python列表,包含logs daya frame的所有列名。
使用drop()方法除去列
logs = logs.drop("BroadcastLogID", "SequenceNO")
Getting rid of columns, select style
- logs = logs.select(
- *[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
- )
Extracting the hours, minutes, and seconds from the Duration column
WARNING If you create a column withColumn() and give it a name that
already exists in your data frame, PySpark will happily overwrite the column.
WARNING Creating many (100+) new columns using withColumns() will slow
Spark down to a grind. If you need to create a lot of columns at once, use the
select() approach. While it will generate the same work, it is less tasking on
the query planner.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。