当前位置:   article > 正文

Lecture 7 Spark Core-RDD持久化

Lecture 7 Spark Core-RDD持久化

0 回顾

        Q1. RDD创建有哪几种方法?

        A1:通过并行化集合的方式(本地集合转分布式集合) 或者读取数据的方式创(TextFile\WholeTextFile)

        Q2. RDD分区数如何查看?

        A2:通过getNumPartitions API 查看, 返回值Int.

        Q3. Transformation 和 Action的区别?

        A3:转换算子的返回值100%是RDD, 而Action算子的返回值100%不是RDD. 转换算子是懒加载的, 只有遇到Action才会执行. Action就是转换算子处理链条的开关.

        Q4. 哪两个Action算子的结果不经过Driver, 直接输出?

        A4:foreach 和 saveAsTextFile 直接由Executor执行后输出 不会将结果发送到Driver上去.

        Q5. reduceByKey 和 groupByKey的区别?

        A5:reduceByKey自带聚合逻辑, groupByKey不带 如果做数据聚合reduceByKey的效率更好, 因为可以先聚合后shuffle再最终聚合, 传输的IO小.

        Q6. mapPartitions 和 foreachPartition 的区别?

        A6:mapPartitions 带有返回值 foreachPartition不带.

        Q7. 对于分区操作有什么要注意的地方?

        A7:尽量不要增加分区, 可能破坏内存迭代的计算管道.

1.1 过程数据

 1.2 RDD的缓存

1.2.1 缓存的作用代码演示 

        不使用缓存,代码如下:

  1. # coding:utf8
  2. import time
  3. from pyspark import SparkConf, SparkContext
  4. from pyspark.storagelevel import StorageLevel
  5. if __name__ == '__main__':
  6. conf = SparkConf().setAppName("test").setMaster("local[*]")
  7. sc = SparkContext(conf=conf)
  8. rdd1 = sc.textFile("../data/input/words.txt")
  9. rdd2 = rdd1.flatMap(lambda x: x.split(" "))
  10. rdd3 = rdd2.map(lambda x: (x, 1))
  11. rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
  12. print(rdd4.collect())
  13. rdd5 = rdd3.groupByKey()
  14. rdd6 = rdd5.mapValues(lambda x: sum(x))
  15. print(rdd6.collect())
  16. rdd3.unpersist()
  17. time.sleep(100000)

        使用缓存,代码如下

  1. # coding:utf8
  2. import time
  3. from pyspark import SparkConf, SparkContext
  4. from pyspark.storagelevel import StorageLevel
  5. if __name__ == '__main__':
  6. conf = SparkConf().setAppName("test").setMaster("local[*]")
  7. sc = SparkContext(conf=conf)
  8. rdd1 = sc.textFile("../data/input/words.txt")
  9. rdd2 = rdd1.flatMap(lambda x: x.split(" "))
  10. rdd3 = rdd2.map(lambda x: (x, 1))
  11. rdd3.cache()
  12. # rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)
  13. rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
  14. print(rdd4.collect())
  15. rdd5 = rdd3.groupByKey()
  16. rdd6 = rdd5.mapValues(lambda x: sum(x))
  17. print(rdd6.collect())
  18. rdd3.unpersist()
  19. time.sleep(100000)

        打开4040端口查看DAG图,发现有绿色的小点点。

 

1.2.2 缓存如何工作 

         即,RDD缓存是一个整体,但是分散存储在各个节点(硬盘或内存)上。

        说到内存,这也暴露了一个问题,缓存是不安全的(只是在设计上认为不安全,物理上并不一定)

1.2.3 检查点

        checkpoint在设计上被认为是安全的,所以没必要保存血缘关系

 1.2.4 检查点代码演示

        代码如下:

  1. # coding:utf8
  2. import time
  3. from pyspark import SparkConf, SparkContext
  4. from pyspark.storagelevel import StorageLevel
  5. if __name__ == '__main__':
  6. conf = SparkConf().setAppName("test").setMaster("local[*]")
  7. sc = SparkContext(conf=conf)
  8. # 1. 告知spark, 开启CheckPoint功能
  9. sc.setCheckpointDir("hdfs://node1:8020/output/ckp")
  10. rdd1 = sc.textFile("../data/input/words.txt")
  11. rdd2 = rdd1.flatMap(lambda x: x.split(" "))
  12. rdd3 = rdd2.map(lambda x: (x, 1))
  13. # 调用checkpoint API 保存数据即可
  14. rdd3.checkpoint()
  15. rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
  16. print(rdd4.collect())
  17. rdd5 = rdd3.groupByKey()
  18. rdd6 = rdd5.mapValues(lambda x: sum(x))
  19. print(rdd6.collect())
  20. rdd3.unpersist()
  21. time.sleep(100000)

         查看DAG图,直接从ckp开始,也说明了他确实没有保存血缘关系

1.2.5 小结 

1. Cache和Checkpoint区别

        Cache是轻量化保存RDD数据, 可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留RDD 血缘关系)

        CheckPoint是重量级保存RDD数据, 是集中存储, 只能存储在硬盘(HDFS)上, 设计上是安全的(不保留 RDD血缘关系)

2. Cache 和 CheckPoint的性能对比?

        Cache性能更好, 因为是分散存储, 各个Executor并行执行, 效率高, 可以保存到内存中(占内存),更快

        CheckPoint比较慢, 因为是集中存储, 涉及到网络IO, 但是存储到HDFS上更加安全(多副本)




1.3 案例分析

1.3.1 搜索引擎日志分析

jieba分词使用以下命令安装(三台服务器都要安装)

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba

查看是否安装了,使用如下命令

find / -name "jieba*"

替换命令

:%s/old/new/g

完全代码 

  1. # coding:utf8
  2. # 导入Spark的相关包
  3. import time
  4. from pyspark import SparkConf, SparkContext
  5. from pyspark.storagelevel import StorageLevel
  6. from defs import context_jieba, filter_words, append_words, extract_user_and_word
  7. from operator import add
  8. if __name__ == '__main__':
  9. # 0. 初始化执行环境 构建SparkContext对象
  10. conf = SparkConf().setAppName("test").setMaster("local[*]")
  11. sc = SparkContext(conf=conf)
  12. # 1. 读取数据文件
  13. file_rdd = sc.textFile("hdfs://node1:8020/test/input/SogouQ.txt")
  14. # 2. 对数据进行切分 \t,切出来的每一行数据都是一个数组对象
  15. split_rdd = file_rdd.map(lambda x: x.split("\t"))
  16. s1=split_rdd.collect()
  17. # 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.
  18. split_rdd.persist(StorageLevel.DISK_ONLY)
  19. # TODO: 需求1: 用户搜索的关键`词`分析
  20. # 主要分析热点词
  21. # 将所有的搜索内容取出
  22. # print(split_rdd.takeSample(True, 3))
  23. context_rdd = split_rdd.map(lambda x: x[2])
  24. s2=context_rdd.collect()
  25. print(context_rdd.take(5))
  26. # 对搜索的内容进行分词分析
  27. words_rdd = context_rdd.flatMap(context_jieba)
  28. s3 = words_rdd.collect()
  29. print(words_rdd.take(5))
  30. # print(words_rdd.collect())
  31. # 院校 帮 -> 院校帮
  32. # 博学 谷 -> 博学谷
  33. # 传智播 客 -> 传智播客
  34. filtered_rdd = words_rdd.filter(filter_words)
  35. s4=filtered_rdd.collect()
  36. # 将关键词转换: 穿直播 -> 传智播客
  37. final_words_rdd = filtered_rdd.map(append_words)
  38. s5=final_words_rdd.collect()
  39. # 对单词进行 分组 聚合 排序 求出前5名
  40. # sortByKey?
  41. result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\
  42. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  43. take(5)
  44. print("需求1结果: ", result1)
  45. # TODO: 需求2: 用户和关键词组合分析
  46. # 1, 我喜欢传智播客
  47. # 1+我 1+喜欢 1+传智播客
  48. user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
  49. # 对用户的搜索内容进行分词, 分词后和用户ID再次组合
  50. user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)
  51. # 对内容进行 分组 聚合 排序 求前5
  52. result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\
  53. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  54. take(5)
  55. print("需求2结果: ", result2)
  56. # TODO: 需求3: 热门搜索时间段分析
  57. # 取出来所有的时间
  58. time_rdd = split_rdd.map(lambda x: x[0])
  59. # 对时间进行处理, 只保留小时精度即可
  60. hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
  61. # 分组 聚合 排序
  62. result3 = hour_with_one_rdd.reduceByKey(add).\
  63. sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  64. collect()
  65. print("需求3结果: ", result3)
  66. split_rdd.unpersist()
  67. time.sleep(100000)

1.3.2 小结

        1.  案例中使用的分词库是?

        jieba库

        2. 为什么要在全部的服务器安装jieba库?

        因为YARN是集群运行, Executor可以在所有服务器上执行, 所以每个服务器都需要有jieba库提供支 撑

        3. 如何尽量提高任务计算的资源?

        计算CPU核心和内存量, 通过--executor-memory 指定executor内存, 通过--executor-cores 指定 executor的核心数 通过--num-executors 指定总executor数量

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/514137
推荐阅读
相关标签
  

闽ICP备14008679号