当前位置:   article > 正文

python做大数据分析入门,python大数据开发教程_py大数据

py大数据

大家好,小编来为大家解答以下问题,python做大数据需要学哪方面?,python做大数据分析入门,今天让我们一起来看看吧!

Source code download: 本文相关源码

PySpark极速入门

一:Pyspark简介与安装

什么是Pyspark?

PySpark是Spark的Python语言接口,通过它,可以使用Python API编写Spark应用程序,目前支持绝大多数Spark功能。目前Spark官方在其支持的所有语言中,将Python置于首位python练手怎么接单

如何安装?

在终端输入

pip intsall pyspark

或者使用pycharm,在GUI界面安装

二:编程实践

加载、转换数据
  1. # 导入pyspark
  2. # 导入pandas, 稍后与pyspark中的数据结构做对比
  3. import pyspark
  4. import pandas as pd

在编写spark程序前,我们要创建一个SparkSession对象

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName("Spark极速入门").getOrCreate()

可以看到会话的一些信息:使用的Spark版本、运行模式、应用程序名字

演示环境用的是local本地模式, * 代表的是使用全部线程 如果想用集群模式的话,可以去查看集群搭建的相关教程 届时pyspark程序作为spark的客户端,设置连接集群,就是真正的分布式计算了 目前只是本地模式,用多线程去模拟分布式计算。

spark

看看我们将用到的test1数据吧

使用read方法,用option设置是否读取csv的头,再指定路径就可以读取数据了

df_spark = spark.read.option("header""true").csv("./data/test1.csv")

看看是什么类型

type(df_spark)
pyspark.sql.dataframe.DataFrame

再看看用pandas读取是什么类型

type(pd.read_csv("./data/test1.csv"))
pandas.core.frame.DataFrame

可以发现Spark读取这种结构化数据时,用的也是和pandas类似的dataframe结构 这也是Spark应用最广泛的数据结构

使用show方法打印数据

df_spark.show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

使用printSchema方法打印元数据信息,发现明明是数值类型的,它却读取为了字符串类型

df_spark.printSchema()
  1. root
  2. |-- Name: string (nullable = true)
  3. |-- age: string (nullable = true)
  4. |-- Experience: string (nullable = true)
  5. |-- Salary: string (nullable = true)

在读取时,加上类型推断,发现此时已经能正确读取了

  1. df_spark = spark.read.option("header""true").csv("./data/test1.csv",inferSchema=True)
  2. df_spark.printSchema()
  1. root
  2. |-- Name: string (nullable = true)
  3. |-- age: integer (nullable = true)
  4. |-- Experience: integer (nullable = true)
  5. |-- Salary: integer (nullable = true)

选择某些列, 可以发现不管选多列还是选单列,返回的都是dataframe 返回的也同样可以printSchema、show等dataframe使用的方法,做到了结构的统一

df_spark.select(["Name""age"])
DataFrame[Name: string, age: int]
df_spark.select("Name")
DataFrame[Name: string]
df_spark.select(["Name""age""Salary"]).printSchema()
  1. root
  2. |-- Name: string (nullable = true)
  3. |-- age: integer (nullable = true)
  4. |-- Salary: integer (nullable = true)

不用select,而用[]直接选取,就有点类似与pandas的series了

df_spark["Name"]
Column<'Name'>

column就不能直接show了

df_spark["age"].show()
  1. ---------------------------------------------------------------------------
  2. TypeError Traceback (most recent call last)
  3. Input In [15], in <cell line: 1>()
  4. ----> 1 df_spark["age"].show()
  5. TypeError: 'Column' object is not callable

用describe方法可以对dataframe做一些简单的统计

df_spark.describe().show()
  1. +-------+------+------------------+-----------------+------------------+
  2. |summary| Name| age| Experience| Salary|
  3. +-------+------+------------------+-----------------+------------------+
  4. | count| 6| 6| 6| 6|
  5. | mean| null|26.333333333333332|4.666666666666667|21333.333333333332|
  6. | stddev| null| 4.179314138308661|3.559026084010437| 5354.126134736337|
  7. | min|Harsha| 21| 1| 15000|
  8. | max| Sunny| 31| 10| 30000|
  9. +-------+------+------------------+-----------------+------------------+

用withColumn方法给dataframe加上一列

df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"+ 3)
df_spark.show()
  1. +---------+---+----------+------+-----------------------+
  2. | Name|age|Experience|Salary|Experience After 3 year|
  3. +---------+---+----------+------+-----------------------+
  4. | Krish| 31| 10| 30000| 13|
  5. |Sudhanshu| 30| 8| 25000| 11|
  6. | Sunny| 29| 4| 20000| 7|
  7. | Paul| 24| 3| 20000| 6|
  8. | Harsha| 21| 1| 15000| 4|
  9. | Shubham| 23| 2| 18000| 5|
  10. +---------+---+----------+------+-----------------------+

用drop方法删除列

  1. df_spark = df_spark.drop("Experience After 3 year")
  2. df_spark.show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

用withColumnRename方法重命名列

df_spark.withColumnRenamed("Name", "New Name").show()
  1. +---------+---+----------+------+
  2. | New Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

处理缺失值

看看接下来要带缺失值的test2数据吧

CSeoe.png

  1. df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
  2. df_spark.show()
  1. +---------+----+----------+------+
  2. | Name| age|Experience|Salary|
  3. +---------+----+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. | Mahesh|null| null| 40000|
  11. | null| 34| 10| 38000|
  12. | null| 36| null| null|
  13. +---------+----+----------+------+

用na.drop删除缺失值 how参数设置策略,any意思是只要一行里有缺失值,那就删了 any也是how的默认参数

df_spark.na.drop(how="any").show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

可以通过thresh参数设置阈值,代表超过一行中缺失值的数量超过这个值,才会被删除

df_spark.na.drop(how="any", thresh=2).show()
  1. +---------+----+----------+------+
  2. | Name| age|Experience|Salary|
  3. +---------+----+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. | Mahesh|null| null| 40000|
  11. | null| 34| 10| 38000|
  12. +---------+----+----------+------+

也可以用subset参数设置关注的列 下面代码意思是,在Experience列中,只要有缺失值就删掉

df_spark.na.drop(how="any", subset=["Experience"]).show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. | null| 34| 10| 38000|
  11. +---------+---+----------+------+

用fillna填充缺失值, 可以用字典对各列的填充值进行设置

df_spark.fillna({'Name''unknown''age'18'Experience'0'Salary'0}).show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. | Mahesh| 18| 0| 40000|
  11. | unknown| 34| 10| 38000|
  12. | unknown| 36| 0| 0|
  13. +---------+---+----------+------+

还可以调用机器学习模块的相关方法, 通过设置策略,可以用平均数、众数等方式填充

  1. from pyspark.ml.feature import Imputer
  2. imputer = Imputer(
  3.     inputCols = ['age''Experience''Salary'],
  4.     outputCols = [f"{c}_imputed" for c in ['age''Experience''Salary']]
  5. ).setStrategy("mean")
imputer.fit(df_spark).transform(df_spark).show()
  1. +---------+----+----------+------+-----------+------------------+--------------+
  2. | Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
  3. +---------+----+----------+------+-----------+------------------+--------------+
  4. | Krish| 31| 10| 30000| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000| 29| 4| 20000|
  7. | Paul| 24| 3| 20000| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000| 23| 2| 18000|
  10. | Mahesh|null| null| 40000| 28| 5| 40000|
  11. | null| 34| 10| 38000| 34| 10| 38000|
  12. | null| 36| null| null| 36| 5| 25750|
  13. +---------+----+----------+------+-----------+------------------+--------------+

过滤操作

还是切换到test1数据

  1. df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
  2. df_spark.show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

可以使用filter方法对数据进行过滤操作,类似于SQL中的where 可以使用字符串的方式,也可以利用column方式去传递条件

df_spark.filter("Salary <= 20000").show()
  1. +-------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +-------+---+----------+------+
  4. | Sunny| 29| 4| 20000|
  5. | Paul| 24| 3| 20000|
  6. | Harsha| 21| 1| 15000|
  7. |Shubham| 23| 2| 18000|
  8. +-------+---+----------+------+

df_spark.filter(df_spark["Salary"]<=20000).show()
  1. +-------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +-------+---+----------+------+
  4. | Sunny| 29| 4| 20000|
  5. | Paul| 24| 3| 20000|
  6. | Harsha| 21| 1| 15000|
  7. |Shubham| 23| 2| 18000|
  8. +-------+---+----------+------+

如果是字符串,用 and 表示同时满足多个条件 如果是用column,用( & ) 连接多个条件

df_spark.filter("Salary <= 20000 and age <= 24").show()
  1. +-------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +-------+---+----------+------+
  4. | Paul| 24| 3| 20000|
  5. | Harsha| 21| 1| 15000|
  6. |Shubham| 23| 2| 18000|
  7. +-------+---+----------+------+

  1. df_spark.filter(
  2.     (df_spark["Salary"]<=20000)
  3.     & (df_spark["age"]<=24)
  4. ).show()
  1. +-------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +-------+---+----------+------+
  4. | Paul| 24| 3| 20000|
  5. | Harsha| 21| 1| 15000|
  6. |Shubham| 23| 2| 18000|
  7. +-------+---+----------+------+

column中,用|表示或, ~表示取反
  1. df_spark.filter(
  2.     (df_spark["Salary"]<=20000)
  3.     | (df_spark["age"]<=24)
  4. ).show()
  1. +-------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +-------+---+----------+------+
  4. | Sunny| 29| 4| 20000|
  5. | Paul| 24| 3| 20000|
  6. | Harsha| 21| 1| 15000|
  7. |Shubham| 23| 2| 18000|
  8. +-------+---+----------+------+

  1. df_spark.filter(
  2.     (df_spark["Salary"]<=20000)
  3.     | ~(df_spark["age"]<=24)
  4. ).show()
  1. +---------+---+----------+------+
  2. | Name|age|Experience|Salary|
  3. +---------+---+----------+------+
  4. | Krish| 31| 10| 30000|
  5. |Sudhanshu| 30| 8| 25000|
  6. | Sunny| 29| 4| 20000|
  7. | Paul| 24| 3| 20000|
  8. | Harsha| 21| 1| 15000|
  9. | Shubham| 23| 2| 18000|
  10. +---------+---+----------+------+

分组聚合

换一个数据集test3

  1. df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
  2. df_spark.show()
  1. +---------+------------+------+
  2. | Name| Departments|salary|
  3. +---------+------------+------+
  4. | Krish|Data Science| 10000|
  5. | Krish| IOT| 5000|
  6. | Mahesh| Big Data| 4000|
  7. | Krish| Big Data| 4000|
  8. | Mahesh|Data Science| 3000|
  9. |Sudhanshu|Data Science| 20000|
  10. |Sudhanshu| IOT| 10000|
  11. |Sudhanshu| Big Data| 5000|
  12. | Sunny|Data Science| 10000|
  13. | Sunny| Big Data| 2000|
  14. +---------+------------+------+

使用groupby方法对dataframe某些列进行分组

df_spark.groupBy("Name")
<pyspark.sql.group.GroupedData at 0x227454d4be0>

可以看到分组的结果是GroupedData对象,它不能使用show等方法打印 GroupedData对象需要进行聚合操作,才能重新转换为dataframe 聚合函数有sum、count、avg、max、min等

df_spark.groupBy("Departments").sum().show()
  1. +------------+-----------+
  2. | Departments|sum(salary)|
  3. +------------+-----------+
  4. | IOT| 15000|
  5. | Big Data| 15000|
  6. |Data Science| 43000|
  7. +------------+-----------+

三:总结

Pandas的dataframe与PySpark的dataframe有许多相似之处,熟悉Pandas的同学可以很快适应它的API。目前可以粗浅地把PySpark理解为”分布式的Pandas“,不过,PySpark还有分布式机器学习的功能——Spark MLlib(可以理解为分布式的Sklearn、TensorFlow等),后续会给大家介绍。在集群中,它的dataframe可以分布在不同的机器上,以此处理海量数据。有兴趣的小伙伴可以通过虚拟机搭建一个Spark集群,进一步学习Spark。

Apache Spark™ - 用于大规模数据分析的统一引擎

文章知识点与官方知识档案匹配,可进一步学习相关知识
Python入门技能树首页概览425935 人正在系统学习中
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/726762
推荐阅读
相关标签
  

闽ICP备14008679号