赞
踩
Q1. RDD创建有哪几种方法?
A1:通过并行化集合的方式(本地集合转分布式集合) 或者读取数据的方式创(TextFile\WholeTextFile)
Q2. RDD分区数如何查看?
A2:通过getNumPartitions API 查看, 返回值Int.
Q3. Transformation 和 Action的区别?
A3:转换算子的返回值100%是RDD, 而Action算子的返回值100%不是RDD. 转换算子是懒加载的, 只有遇到Action才会执行. Action就是转换算子处理链条的开关.
Q4. 哪两个Action算子的结果不经过Driver, 直接输出?
A4:foreach 和 saveAsTextFile 直接由Executor执行后输出 不会将结果发送到Driver上去.
Q5. reduceByKey 和 groupByKey的区别?
A5:reduceByKey自带聚合逻辑, groupByKey不带 如果做数据聚合reduceByKey的效率更好, 因为可以先聚合后shuffle再最终聚合, 传输的IO小.
Q6. mapPartitions 和 foreachPartition 的区别?
A6:mapPartitions 带有返回值 foreachPartition不带.
Q7. 对于分区操作有什么要注意的地方?
A7:尽量不要增加分区, 可能破坏内存迭代的计算管道.
不使用缓存,代码如下:
- # coding:utf8
- import time
-
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
-
- if __name__ == '__main__':
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- rdd1 = sc.textFile("../data/input/words.txt")
- rdd2 = rdd1.flatMap(lambda x: x.split(" "))
- rdd3 = rdd2.map(lambda x: (x, 1))
-
-
- rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
- print(rdd4.collect())
-
- rdd5 = rdd3.groupByKey()
- rdd6 = rdd5.mapValues(lambda x: sum(x))
- print(rdd6.collect())
-
- rdd3.unpersist()
- time.sleep(100000)
使用缓存,代码如下
- # coding:utf8
- import time
-
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
-
- if __name__ == '__main__':
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- rdd1 = sc.textFile("../data/input/words.txt")
- rdd2 = rdd1.flatMap(lambda x: x.split(" "))
- rdd3 = rdd2.map(lambda x: (x, 1))
-
- rdd3.cache()
- # rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)
-
- rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
- print(rdd4.collect())
-
- rdd5 = rdd3.groupByKey()
- rdd6 = rdd5.mapValues(lambda x: sum(x))
- print(rdd6.collect())
-
- rdd3.unpersist()
- time.sleep(100000)
打开4040端口查看DAG图,发现有绿色的小点点。
即,RDD缓存是一个整体,但是分散存储在各个节点(硬盘或内存)上。
说到内存,这也暴露了一个问题,缓存是不安全的(只是在设计上认为不安全,物理上并不一定)
checkpoint在设计上被认为是安全的,所以没必要保存血缘关系
代码如下:
- # coding:utf8
- import time
-
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
-
- if __name__ == '__main__':
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- # 1. 告知spark, 开启CheckPoint功能
- sc.setCheckpointDir("hdfs://node1:8020/output/ckp")
- rdd1 = sc.textFile("../data/input/words.txt")
- rdd2 = rdd1.flatMap(lambda x: x.split(" "))
- rdd3 = rdd2.map(lambda x: (x, 1))
-
- # 调用checkpoint API 保存数据即可
- rdd3.checkpoint()
-
- rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
- print(rdd4.collect())
-
- rdd5 = rdd3.groupByKey()
- rdd6 = rdd5.mapValues(lambda x: sum(x))
- print(rdd6.collect())
-
- rdd3.unpersist()
- time.sleep(100000)
查看DAG图,直接从ckp开始,也说明了他确实没有保存血缘关系
1. Cache和Checkpoint区别
Cache是轻量化保存RDD数据, 可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留RDD 血缘关系)
CheckPoint是重量级保存RDD数据, 是集中存储, 只能存储在硬盘(HDFS)上, 设计上是安全的(不保留 RDD血缘关系)
2. Cache 和 CheckPoint的性能对比?
Cache性能更好, 因为是分散存储, 各个Executor并行执行, 效率高, 可以保存到内存中(占内存),更快
CheckPoint比较慢, 因为是集中存储, 涉及到网络IO, 但是存储到HDFS上更加安全(多副本)
jieba分词使用以下命令安装(三台服务器都要安装)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
查看是否安装了,使用如下命令
find / -name "jieba*"
替换命令
:%s/old/new/g
完全代码
- # coding:utf8
-
- # 导入Spark的相关包
- import time
-
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
- from defs import context_jieba, filter_words, append_words, extract_user_and_word
- from operator import add
-
- if __name__ == '__main__':
- # 0. 初始化执行环境 构建SparkContext对象
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- # 1. 读取数据文件
- file_rdd = sc.textFile("hdfs://node1:8020/test/input/SogouQ.txt")
-
- # 2. 对数据进行切分 \t,切出来的每一行数据都是一个数组对象
- split_rdd = file_rdd.map(lambda x: x.split("\t"))
- s1=split_rdd.collect()
-
- # 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.
- split_rdd.persist(StorageLevel.DISK_ONLY)
-
- # TODO: 需求1: 用户搜索的关键`词`分析
- # 主要分析热点词
- # 将所有的搜索内容取出
- # print(split_rdd.takeSample(True, 3))
- context_rdd = split_rdd.map(lambda x: x[2])
- s2=context_rdd.collect()
- print(context_rdd.take(5))
- # 对搜索的内容进行分词分析
- words_rdd = context_rdd.flatMap(context_jieba)
- s3 = words_rdd.collect()
- print(words_rdd.take(5))
- # print(words_rdd.collect())
- # 院校 帮 -> 院校帮
- # 博学 谷 -> 博学谷
- # 传智播 客 -> 传智播客
- filtered_rdd = words_rdd.filter(filter_words)
- s4=filtered_rdd.collect()
- # 将关键词转换: 穿直播 -> 传智播客
- final_words_rdd = filtered_rdd.map(append_words)
- s5=final_words_rdd.collect()
- # 对单词进行 分组 聚合 排序 求出前5名
- # sortByKey?
- result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\
- sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
- take(5)
-
- print("需求1结果: ", result1)
-
- # TODO: 需求2: 用户和关键词组合分析
- # 1, 我喜欢传智播客
- # 1+我 1+喜欢 1+传智播客
- user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
- # 对用户的搜索内容进行分词, 分词后和用户ID再次组合
- user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)
- # 对内容进行 分组 聚合 排序 求前5
- result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\
- sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
- take(5)
-
- print("需求2结果: ", result2)
-
- # TODO: 需求3: 热门搜索时间段分析
- # 取出来所有的时间
- time_rdd = split_rdd.map(lambda x: x[0])
- # 对时间进行处理, 只保留小时精度即可
- hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
- # 分组 聚合 排序
- result3 = hour_with_one_rdd.reduceByKey(add).\
- sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
- collect()
-
- print("需求3结果: ", result3)
- split_rdd.unpersist()
- time.sleep(100000)
1. 案例中使用的分词库是?
jieba库
2. 为什么要在全部的服务器安装jieba库?
因为YARN是集群运行, Executor可以在所有服务器上执行, 所以每个服务器都需要有jieba库提供支 撑
3. 如何尽量提高任务计算的资源?
计算CPU核心和内存量, 通过--executor-memory 指定executor内存, 通过--executor-cores 指定 executor的核心数 通过--num-executors 指定总executor数量
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。