当前位置:   article > 正文

【python】-pyspark应用_pyspark使用

pyspark使用

目录

一、前言

二、pyspark库的安装

三、构建pyspark执行环境入口对象

四、数据输入

4.1  RDD对象

4.2  python数据容器转RDD对象

五、数据计算

5.1  map方法

5.2 flatMap方法

5.3  reduceByKey算子

5.4 练习案例1

5.5  Filter方法

5.6  distinct算子

5.7 sortBy算子

5.8  练习案例2

六、数据输出

6.1  输出为python对象

1  collect算子

2 reduce算子

3  take算子

6.2  输出到文件中

1. saveAsTextFile算子


一、前言

Spark是一个开源的大数据处理框架,它以其高速、易用性和对复杂分析的支持而闻名。以下是Spark的一些关键特点和组件:

  1. 高速性能:Spark能够快速进行数据处理和分析,特别是它可以将数据缓存在内存中,从而加快处理速度。
  2. 易用性:Spark支持多种编程语言,包括Scala、Java、Python和R,这使得它对于不同背景的开发者都很容易上手。
  3. 多样性支持:Spark不仅可以处理批量数据,还能处理实时数据流,支持各种数据源和存储格式。
  4. 组件丰富:Spark包含多个组件,如Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算库)。
  5. 与Hadoop的关系:Spark可以运行在Hadoop集群上,并与Hadoop的HDFS和YARN集成,但它的处理速度通常比MapReduce快很多。
  6. 执行流程:Spark的执行流程涉及将数据转化为弹性分布式数据集(RDDs),然后通过转换和动作来进行处理。
  7. 性能调优:Spark提供了丰富的配置选项来优化其性能,包括内存管理、分区策略和任务调度等。
  8. 社区和支持:Spark有一个活跃的社区,提供了大量的文档、教程和案例,帮助开发者学习和使用Spark。
  9. 版本更新:随着技术的发展,Spark也在不断更新,引入新的功能和改进,例如Spark 3.0带来了许多新特性和性能提升。
  10. 教育资源:有许多在线课程和视频教程可以帮助初学者从零开始学习Spark,涵盖环境搭建、核心概念、流处理、SQL、结构化流处理、综合案例等内容。

Spark是一个强大的工具,适用于大数据处理和分析,无论是学术研究还是工业应用,都能找到其用武之地。

二、pyspark库的安装

pip install pyspark

三、构建pyspark执行环境入口对象

想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口环境。

pyspark的执行环境入口对象是:类SparkContext的类对象

  1. """
  2. 演示获取pyspark的执行环境入库对象:SparkContext
  3. 并通过SparkContext对象获取当前pyspark的版本
  4. """
  5. # 导包
  6. from pyspark import SparkConf,SparkContext
  7. # 创建SparkConf对象
  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
  9. # 基于SparkConf类对象创建SparkContext对象
  10. sc = SparkContext(conf=conf)
  11. # 打印pyspark的运行版本
  12. print(sc.version)
  13. # 停止SparkContext对象的运行(停止pyspark程序)
  14. sc.stop()

四、数据输入

4.1  RDD对象

RDD(Resilient Distributed Datasets,弹性分布式数据集)是Apache Spark中的核心概念,它是一个不可变的分布式对象集合

pyspark支持多种数据的输入,再输入完成后,都会得到一个RDD对象,pyspark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

4.2  python数据容器转RDD对象

pyspark支持通过SparkContext对象的parallelize成员方法,将

  • list
  • tuple
  • set
  • dict
  • str

转换为pyspark的RDD对象

(字符串会被拆分出1个个的字符,存入RDD对象;字典仅有key会被存入RDD对象)

  1. from pyspark import SparkConf,SparkContext
  2. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  3. sc = SparkContext(conf=conf)
  4. # 通过parallelize方法将python对象加载到spark内,成为RDD对象
  5. add1 = sc.parallelize([1, 2, 3, 4, 5])
  6. add2 = sc.parallelize((1, 2, 3, 4, 5))
  7. add3 = sc.parallelize("abcdefg")
  8. add4 = sc.parallelize({1, 2, 3, 4, 5})
  9. add5 = sc.parallelize({"key1":"value1", "key2":"value2"})
  10. # 如果要查看RDD里面有什么内容,需要用collect()方法
  11. print(add1.collect())
  12. print(add2.collect())
  13. print(add3.collect())
  14. print(add4.collect())
  15. print(add5.collect())
  16. sc.stop()

结果如下:

通过读取文件转RDD对象

先准备好一个hello.txt文件,内容为:

  1. from pyspark import SparkConf,SparkContext
  2. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  3. sc = SparkContext(conf=conf)
  4. # 通过textFile方法,读取文件按数据加载到spark内,成为RDD对象
  5. add = sc.textFile("D:/pydaima/8day速成python/shuju/hello.txt")
  6. print(add.collect())
  7. sc.stop()

运行结果:

五、数据计算

5.1  map方法

功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD

语法:

  1. """
  2. 演示RDD的map方法的使用
  3. """
  4. from pyspark import SparkConf,SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. add = sc.parallelize([1, 2, 3, 4, 5])
  11. # 通过map方法将全部数据都乘以10
  12. def func(data):
  13. return data * 10
  14. add2 = add.map(lambda x: x * 10)
  15. print(add2.collect())
  16. sc.stop()

5.2 flatMap方法

功能:对add执行map操作,然后进行解除嵌套操作。

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 准备一个RDD
  7. add = sc.parallelize(["python student 588", "python 588 588", "process finished"])
  8. # 需求:将RDD数据里面的一个个单词提取出来
  9. add2 = add.flatMap(lambda x: x.split(' '))
  10. print(add2.collect())

5.3  reduceByKey算子

功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据的聚合操作。

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 准备一个RDD
  7. add = sc.parallelize([('男', 99), ('女', 99), ('女', 912), ('男', 99)])
  8. # 求男生和女生两个组的成绩之和
  9. add2 = add.reduceByKey(lambda a, b: a + b)
  10. print(add2.collect())

5.4 练习案例1

使用学习到的内容,完成:

  • 读取文件
  • 统计文件内,单词的出现数量

先准备一个文件,文件内容如下:

  1. # 1.统计执行环境入口对象
  2. from pyspark import SparkConf,SparkContext
  3. import os
  4. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. # 2.读取数据文件
  8. add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
  9. # 3.取出全部单词
  10. word_add = add.flatMap(lambda x: x.split(' '))
  11. # 4.将所有单词都转换为二元元组,单词为key,value设置为1
  12. add2 = word_add.map(lambda word: (word, 1))
  13. # 5.分组并求和
  14. result_add = add2.reduceByKey(lambda a, b: a + b)
  15. # 6.打印输出结果
  16. print(result_add.collect())

运行结果:

5.5  Filter方法

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 准备一个RDD
  7. rdd = sc.parallelize([1, 2, 3, 4, 5])
  8. # 对RDD的数据进行过滤
  9. rdd2 = rdd.filter(lambda num: num % 2 == 0)
  10. print(rdd2.collect())

运行示例:

5.6  distinct算子

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 准备一个RDD
  7. rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9])
  8. # 对RDD的数据进行去重
  9. rdd2 = rdd.distinct()
  10. print(rdd2.collect())

5.7 sortBy算子

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 2.读取数据文件
  7. add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
  8. # 3.取出全部单词
  9. word_add = add.flatMap(lambda x: x.split(' '))
  10. # 4.将所有单词都转换为二元元组,单词为key,value设置为1
  11. add2 = word_add.map(lambda word: (word, 1))
  12. # 5.分组并求和
  13. result_add = add2.reduceByKey(lambda a, b: a + b)
  14. # 以上使用综合案例1的代码进行补充
  15. # 对结果进行排序
  16. final_rdd = result_add.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
  17. print(final_rdd.collect())

5.8  练习案例2

对数据文件进行读取,将数据按照各个城市销售额排名,从大到小;全部城市有哪些商品类别在售卖;北京市有哪些商品类别在售卖。

  1. import json
  2. from pyspark import SparkConf,SparkContext
  3. import os
  4. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. # 需求1:城市销售额排名
  8. # 1.1 读取文件得到RDD
  9. file_rdd = sc.textFile("D:/pydaima/8day速成python/shuju/orders.txt")
  10. # 1.2 取出一个个json字符串
  11. json_str_rdd = file_rdd.flatMap(lambda x: x.split('|'))
  12. # 1.3 将一个个json字符串转为字典
  13. dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
  14. # 1.4 取出城市和销售额数据 (城市,销售额)
  15. city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
  16. # 1.5 按城市分组销售额聚合
  17. city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
  18. # 1.6 按照销售额聚合结果进行排序
  19. result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
  20. print("需求1的结果:", result1_rdd.collect())
  21. # 需求2:全部城市有哪些商品类别在售卖
  22. # 2.1 取出全部的商品类别, 并去重
  23. category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
  24. print("需求2的结果:", category_rdd.collect())
  25. # 需求3: 北京市有哪些商品类别在售卖
  26. # 3.1 过滤北京的数
  27. bj_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
  28. # 3.2 取出商品类别,并去重
  29. result3_rdd = bj_data_rdd.map(lambda x: x['category']).distinct()
  30. print("需求3的结果:", result3_rdd.collect())

六、数据输出

6.1  输出为python对象

1  collect算子

2 reduce算子

3  take算子

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  5. sc = SparkContext(conf=conf)
  6. # 准备RDD
  7. rdd = sc.parallelize([1, 2, 3, 4, 5])
  8. # collect算子,输出为list对象
  9. rdd_list = rdd.collect()
  10. print(rdd_list)
  11. print(type(rdd_list))
  12. # reduce算子,输出RDD为list对象
  13. num = rdd.reduce(lambda a, b: a + b)
  14. print(num)
  15. # take算子,取出RDD的前N个元素,组成list返回
  16. take_list = rdd.take(3)
  17. print(f"前3个元素是:{take_list}")
  18. # count算子,统计RDD内有多少条数据,返回值为数字
  19. num_count = rdd.count()
  20. print(f"rdd内有{num_count}个元素")
  21. sc.stop()

6.2  输出到文件中

1. saveAsTextFile算子

使用saveAsTextFile需要配置相关环境,调用保存文件的算子,配置Hadoop依赖。

  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
  4. os.environ['HADOOP-HOME'] = "D:/hadoop-py/hadoop-3.0.0"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. # 设置全局并型度为1
  7. conf.set("spark.default.parallelism", "1")
  8. sc = SparkContext(conf=conf)
  9. # 准备RDD1
  10. rdd1 = sc.parallelize([1, 2, 3, 4, 5])
  11. # 准备RDD2
  12. rdd2 = sc.parallelize([('hello', 3), ('Spark', 5), ('Hi', 7)])
  13. # 准备RDD3
  14. rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])
  15. # 输出到文件中
  16. rdd1.saveAsTextFile("D:/pydaima/8day速成python/shuju/output1.txt")
  17. rdd2.saveAsTextFile("D:/pydaima/8day速成python/shuju/output2.txt")
  18. rdd3.saveAsTextFile("D:/pydaima/8day速成python/shuju/output3.txt")

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/635054
推荐阅读
相关标签
  

闽ICP备14008679号