当前位置:   article > 正文

Python进阶知识:整理1 -> pySpark入门

pyspark入门

1 编写执行入口 

  1. # 1.导包
  2. from pyspark import SparkConf, SparkContext
  3. # 2. 创建SparkConf类对象
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
  5. # 3. 基于SparkConf类对象创建SparkContext对象
  6. sc = SparkContext(conf=conf) # 执行入口
  7. # 4.打印pySpark的运行版本
  8. # print(sc.version)
  9. # 5.停止SparkContext对象的运行
  10. sc.stop()

pySpark大数据分析过程分为3步:数据输入、数据计算、数据输出 ,以下内容将重点介绍这三个过程


 

2 数据输入

在数据输入完成后,都会得到一个RDD类的对象(RDD全称为弹性分布式数据集)

  1. # 1.构建执行环境入口对象
  2. from pyspark import SparkConf, SparkContext
  3. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  4. sc = SparkContext(conf=conf)
  5. # 2.通过parallelize方法将Python对象加载到Spark内,成为RDD对象
  6. # 通过sc对象构建RDD
  7. rdd1 = sc.parallelize([1, 2, 3, 4, 5])
  8. rdd2 = sc.parallelize((6, 7, 8, 9, 10))
  9. rdd3 = sc.parallelize("adjsjfjsg")
  10. rdd4 = sc.parallelize({1, 2, 3, 4})
  11. rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
  12. # 如果要查看RDD对象的内容,可以通过collect方法
  13. print(rdd1.collect())
  14. print(rdd2.collect())
  15. print(rdd3.collect())
  16. print(rdd4.collect())
  17. print(rdd5.collect())
  18. # 3.用textFiled方法,读取文件数据加载到Spark内,成为RDD对象
  19. rdd6 = sc.textFile("D:/hello.txt")
  20. print(rdd6.collect())
  21. sc.stop()

3 数据计算

3.1 map算子

map算子是将RDD的数据进行一条条处理(处理的逻辑基于map算子接收的处理函数),返回新的RDD

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量,因为Spark找不到python解释器在什么地方
  3. # 构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. # 1. map 算子
  8. rdd = sc.parallelize([1, 2, 3, 4])
  9. # 通过map方法将全部的元素都乘10
  10. rdd_map = rdd.map(lambda x: x * 10)
  11. print(rdd_map.collect())
  12. # 链式调用
  13. rdd_map1 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
  14. print(rdd_map1.collect())

3.2 flatMap算子

对RDD进行map操作后,进行解除嵌套的作用

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize(["a b c", "d e f", "h i j"])
  8. # 需求:将RDD数据里面的一个个单词都提取出来
  9. rdd2 = rdd.map(lambda x: x.split(" "))
  10. print(f"map操作后的结果:{rdd2.collect()}")
  11. #解嵌套
  12. rdd3 = rdd.flatMap(lambda x: x.split(" "))
  13. print(f"flatMap操作后的结果:{rdd3.collect()}")

3.3 reduceByKey算子

reduceByKey算子:
       
功能:针对(K,V)类型的数据,按照K进行分组,然后根据你提供的聚合逻辑,完成
        组内数据(value)的聚合操作。

        (K,V)类型的数据 -> 二元元组

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5)])
  8. result = rdd.reduceByKey(lambda x, y: x + y)
  9. print(result.collect())

3.4 单词计数案例

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 1.构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. # 2.读取数据文件
  8. rdd = sc.textFile("D:/hello.txt")
  9. word_rdd = rdd.flatMap(lambda line: line.split(" "))
  10. # print(word_rdd.collect())
  11. # 3.对数据进行转换为二元元组
  12. word_count_rdd = word_rdd.map(lambda word: (word, 1))
  13. # 4. 对二元元组进行聚合
  14. word_count_rdd_result = word_count_rdd.reduceByKey(lambda a, b: a + b)
  15. print(word_count_rdd_result.collect())

3.5 filter算子

过滤想要的数据,进行保留

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 1.构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9,10])
  8. filter_rdd = rdd.filter(lambda x: x % 2 == 0) # 得到True则保留
  9. print(filter_rdd.collect())

3.6 distinct算子

对RDD数据进行去重,返回新的RDD

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 1.构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([1, 2, 3, 4, 1, 2, 3, 4])
  8. distinct_rdd = rdd.distinct()
  9. print(distinct_rdd.collect())

3.7 sortBy算子

对RDD数据进行排序,基于你指定的排序依据

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 1.构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. sc = SparkContext(conf=conf)
  7. # 2.读取数据文件
  8. rdd = sc.textFile("D:/hello.txt")
  9. word_rdd = rdd.flatMap(lambda line: line.split(" "))
  10. # print(word_rdd.collect())
  11. # 3.对数据进行转换为二元元组
  12. word_count_rdd = word_rdd.map(lambda word: (word, 1))
  13. # 4. 对二元元组进行聚合
  14. word_count_rdd_result = word_count_rdd.reduceByKey(lambda a, b: a + b)
  15. # 5.对步骤四求的结果进行排序
  16. word_count_rdd_result_sort = word_count_rdd_result.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
  17. # 参数1设置排序的依据;参数2设置升序还是降序;参数3全局排序需要设置分区数为1
  18. print(word_count_rdd_result_sort.collect())

3.8 数据计算综合案例

准备需要的文件


  1. import json
  2. import os
  3. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  4. # 1.构建执行环境入口对象
  5. from pyspark import SparkConf, SparkContext
  6. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  7. sc = SparkContext(conf=conf)
  8. # TODO 需求1:城市销售额排名
  9. # 1.1 读取文件得到RDD
  10. rdd = sc.textFile("D:/PyCharm_projects/python_study_projects/text/orders.txt")
  11. # 1.2 取出JSON字符串
  12. rdd_json = rdd.flatMap(lambda x: x.split("|"))
  13. # print(rdd_json.collect())
  14. # 1.3 json字符串转为字典
  15. rdd_dict = rdd_json.map(lambda x: json.loads(x))
  16. # print(rdd_dict.collect())
  17. # 1.4 取出城市和销售额数据
  18. # (城市, 销售额)
  19. rdd_city_with_money = rdd_dict.map(lambda x: (x["areaName"], int(x["money"])))
  20. # 1.5 按照城市分组
  21. rdd_group = rdd_city_with_money.reduceByKey(lambda x, y: x + y)
  22. # 1.6 按照销售额降序排序
  23. result_rdd1 = rdd_group.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
  24. print(f"需求1的结果是:{result_rdd1.collect()}")
  25. # TODO 需求2:全部城市有哪些商品类别在售卖
  26. # 2.1 取出所有的商品类别
  27. category_rdd = rdd_dict.map(lambda x: x["category"]).distinct()
  28. print(f"需求2的结果是:{category_rdd.collect()}")
  29. # TODO 需求3:北京市有哪些商品类别在售卖
  30. # 3.1 过滤北京市的数据
  31. beijing_data_rdd = rdd_dict.filter(lambda x: x["areaName"] == "北京")
  32. # 3.2 取出所有商品类别
  33. beijing_category_data_rdd = beijing_data_rdd.map(lambda x: x["category"]).distinct()
  34. print(f"需求3的结果是:{beijing_category_data_rdd.collect()}")

4 数据输出

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. os.environ["HADOOP_HOME"] = "D:/Hadoop/hadoop-3.0.0" # 输出为文件需要的配置
  4. # 1.构建执行环境入口对象
  5. from pyspark import SparkConf, SparkContext
  6. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  7. # conf.set("spark.default.parallelism", 1) # 设置全局的并行度为1
  8. sc = SparkContext(conf=conf)
  9. # 准备RDD
  10. rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
  11. # 1. 将RDD数据输出为Python对象
  12. """
  13. collect 算子: -> 将RDD输出为list对象
  14. 功能:将RDD各个分区内的数据统一收集到Driver中,形成一个List对象
  15. 用法:rdd.collect()
  16. """
  17. # print(rdd.collect())
  18. """
  19. reduce 算子:
  20. 功能:将RDD数据按照你传入的逻辑进行聚合
  21. 用法:rdd.reduce(func)
  22. # func: (T, T) -> T 返回值和参数要求类型相同
  23. """
  24. # result = rdd.reduce(lambda x, y: x + y)
  25. # print(result)
  26. """
  27. take 算子:
  28. 功能:取RDD的前N个元素,组合成list返回给你
  29. 用法:rdd.take(N)
  30. """
  31. # result1 = rdd.take(3)
  32. # print(result1)
  33. """
  34. count 算子:
  35. 功能:计算RDD有多少条数据,返回值是一个数字
  36. 用法:rdd.count()
  37. """
  38. # result2 = rdd.count()
  39. # print(result2)
  40. # 2. 将RDD数据输出为文件
  41. """
  42. saveAsTextFile 算子:
  43. 功能:将RDD的数据写入文本文件中
  44. 用法:rdd.saveAsTextFile(path)
  45. """
  46. rdd.saveAsTextFile("D:/output")

5 pySaprk综合案例

表示当前行还未写完,下一行仍是这行的内容

以下都采取链式的写法:

  1. import os
  2. os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe" # 设置环境变量
  3. # 1.构建执行环境入口对象
  4. from pyspark import SparkConf, SparkContext
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  6. conf.set("spark.default.parallelism", 1) # 设置全局的并行度为1
  7. sc = SparkContext(conf=conf)
  8. # 读取文件
  9. file_rdd = sc.textFile("D:/PyCharm_projects/python_study_projects/text/search_log.txt")
  10. # TODO 需求1:热门搜索时间段Top3 (小时精度)
  11. # 1.1 取出所有的时间并转换为小时
  12. # 1.2 转换为(小时,1)的二元元组
  13. # 1.3 Key分组,集合Value
  14. # 1.4 降序排序,取前3
  15. # \表示当前行还未写完,下一行仍是这行的内容
  16. result1 = file_rdd.map(lambda x: x.split("\t")).\
  17. map(lambda x: x[0][:2]).\
  18. map(lambda x: (x, 1)).\
  19. reduceByKey(lambda a, b: a + b).\
  20. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  21. take(3)
  22. print(f"需求1的结果是:{result1}")
  23. # TODO 需求2:热门搜索词Top3
  24. # 2.1 取出全部的搜索词
  25. # 2.2 (词,1) 二元元组
  26. # 2.3 分组集合
  27. # 2.4 排序,取Top3
  28. result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
  29. reduceByKey(lambda a, b: a + b).\
  30. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  31. take(3)
  32. print(f"需求2的结果是:{result2}")
  33. # TODO 需求3:统计黑马程序员关键字在什么时段被搜索的最多
  34. # 3.1 过滤内容,只保留黑马程序员关键字
  35. # 3.2 转换为(小时, 1) 的二元元组
  36. # 3.3 Key 分组聚合Value
  37. # 3.4 排序,取前1
  38. result3 = file_rdd.map(lambda x: x.split("\t")).\
  39. filter(lambda x: x[2] == "黑马程序员").\
  40. map(lambda x: (x[0][:2], 1)).\
  41. reduceByKey(lambda a, b: a + b).\
  42. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  43. take(1)
  44. print(f"需求3的结果是:{result3}")
  45. # TODO 需求4:将数据转换为JSON格式,写到文件中
  46. # 4.1 转换为JSON格式的RDD
  47. # 4.2 写出到文件
  48. file_rdd.map(lambda x: x.split("\t")).\
  49. map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
  50. saveAsTextFile("D:\output_json") # hadoop报错,无法实现,是我自己的环境问题,代码没有问题
  51. sc.stop()

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号