赞
踩
Apache Spark 是用于大规模数据处理的统一分析引擎。
PySpark是由Spark官方开发的Python语言第三方库。
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
""" 演示通过PySpark代码加载数据,即数据输入 """ from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象 # rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # rdd2 = sc.parallelize((1, 2, 3, 4, 5)) # rdd3 = sc.parallelize("abcdefg") # rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # # # 如果要查看RDD里面有什么内容,需要用collect()方法 # print(rdd1.collect()) # print(rdd2.collect()) # print(rdd3.collect()) # print(rdd4.collect()) # print(rdd5.collect()) # 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象 rdd = sc.textFile("D:/hello.txt") print(rdd.collect()) rdd.map() sc.stop()
""" 演示RDD的map成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 通过map方法将全部数据都乘以10 # def func(data): # return data * 10 rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5) print(rdd2.collect()) # (T) -> U # (T) -> T # 链式调用
"""
演示RDD的flatMap成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])
# 需求,将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
"""
演示RDD的reduceByKey成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
""" 完成练习案例:单词计数统计 """ # 1. 构建执行环境入口对象 from pyspark import SparkContext, SparkConf import os os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 2. 读取数据文件 rdd = sc.textFile("D:/hello.txt") # 3. 取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # 4. 将所有单词都转换成二元元组,单词为Key,value设置为1 word_with_one_rdd = word_rdd.map(lambda word: (word, 1)) # 5. 分组并求和 result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 6. 打印输出结果 print(result_rdd.collect())
""" 演示RDD的filter成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 对RDD的数据进行过滤 rdd2 = rdd.filter(lambda num: num % 2 == 0) print(rdd2.collect())
""" 演示RDD的distinct成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10]) # 对RDD的数据进行去重 rdd2 = rdd.distinct() print(rdd2.collect())
""" 演示RDD的sortBy成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 1. 读取数据文件 rdd = sc.textFile("D:/hello.txt") # 2. 取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # 3. 将所有单词都转换成二元元组,单词为Key,value设置为1 word_with_one_rdd = word_rdd.map(lambda word: (word, 1)) # 4. 分组并求和 result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 5. 对结果进行排序 final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1) print(final_rdd.collect())
""" 完成练习案例:JSON商品统计 需求: 1. 各个城市销售额排名,从大到小 2. 全部城市,有哪些商品类别在售卖 3. 北京市有哪些商品类别在售卖 """ from pyspark import SparkConf, SparkContext import os import json os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # TODO 需求1: 城市销售额排名 # 1.1 读取文件得到RDD file_rdd = sc.textFile("D:/orders.txt") # 1.2 取出一个个JSON字符串 json_str_rdd = file_rdd.flatMap(lambda x: x.split("|")) # 1.3 将一个个JSON字符串转换为字典 dict_rdd = json_str_rdd.map(lambda x: json.loads(x)) # 1.4 取出城市和销售额数据 # (城市,销售额) city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money']))) # 1.5 按城市分组按销售额聚合 city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b) # 1.6 按销售额聚合结果进行排序 result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1) print("需求1的结果:", result1_rdd.collect()) # TODO 需求2: 全部城市有哪些商品类别在售卖 # 2.1 取出全部的商品类别 category_rdd = dict_rdd.map(lambda x: x['category']).distinct() print("需求2的结果:", category_rdd.collect()) # 2.2 对全部商品类别进行去重 # TODO 需求3: 北京市有哪些商品类别在售卖 # 3.1 过滤北京市的数据 beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京') # 3.2 取出全部商品类别 result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct() print("需求3的结果:", result3_rdd.collect()) # 3.3 进行商品类别去重
""" 演示将RDD输出为Python对象 """ from pyspark import SparkConf, SparkContext import os import json os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # collect算子,输出RDD为list对象 rdd_list: list = rdd.collect() print(rdd_list) print(type(rdd_list)) # reduce算子,对RDD进行两两聚合 num = rdd.reduce(lambda a, b: a + b) print(num) # take算子,取出RDD前N个元素,组成list返回 take_list = rdd.take(3) print(take_list) # count,统计rdd内有多少条数据,返回值为数字 num_count = rdd.count() print(f"rdd内有{num_count}个元素") sc.stop()
""" 演示将RDD输出到文件中 """ from pyspark import SparkConf, SparkContext import os import json os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe' os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备RDD1 rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1) # 准备RDD2 rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1) # 准备RDD3 rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1) # 输出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
""" 演示PySpark综合案例 """ from pyspark import SparkConf, SparkContext import os import json os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe' os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") conf.set("spark.default.parallelism", "1") sc = SparkContext(conf=conf) # 读取文件转换成RDD file_rdd = sc.textFile("D:/search_log.txt") # TODO 需求1: 热门搜索时间段Top3(小时精度) # 1.1 取出全部的时间并转换为小时 # 1.2 转换为(小时, 1) 的二元元组 # 1.3 Key分组聚合Value # 1.4 排序(降序) # 1.5 取前3 result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(3) print("需求1的结果:", result1) # TODO 需求2: 热门搜索词Top3 # 2.1 取出全部的搜索词 # 2.2 (词, 1) 二元元组 # 2.3 分组聚合 # 2.4 排序 # 2.5 Top3 result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(3) print("需求2的结果:", result2) # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多 # 3.1 过滤内容,只保留黑马程序员关键词 # 3.2 转换为(小时, 1) 的二元元组 # 3.3 Key分组聚合Value # 3.4 排序(降序) # 3.5 取前1 result3 = file_rdd.map(lambda x: x.split("\t")).\ filter(lambda x: x[2] == '黑马程序员').\ map(lambda x: (x[0][:2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(1) print("需求3的结果:", result3) # TODO 需求4: 将数据转换为JSON格式,写出到文件中 # 4.1 转换为JSON格式的RDD # 4.2 写出为文件 file_rdd.map(lambda x: x.split("\t")).\ map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\ saveAsTextFile("D:/output_json")
""" 演示PySpark综合案例 """ from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python' os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1" conf = SparkConf().setAppName("spark_cluster") conf.set("spark.default.parallelism", "24") sc = SparkContext(conf=conf) # 读取文件转换成RDD file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt") # TODO 需求1: 热门搜索时间段Top3(小时精度) # 1.1 取出全部的时间并转换为小时 # 1.2 转换为(小时, 1) 的二元元组 # 1.3 Key分组聚合Value # 1.4 排序(降序) # 1.5 取前3 result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(3) print("需求1的结果:", result1) # TODO 需求2: 热门搜索词Top3 # 2.1 取出全部的搜索词 # 2.2 (词, 1) 二元元组 # 2.3 分组聚合 # 2.4 排序 # 2.5 Top3 result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(3) print("需求2的结果:", result2) # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多 # 3.1 过滤内容,只保留黑马程序员关键词 # 3.2 转换为(小时, 1) 的二元元组 # 3.3 Key分组聚合Value # 3.4 排序(降序) # 3.5 取前1 result3 = file_rdd.map(lambda x: x.split("\t")).\ filter(lambda x: x[2] == '黑马程序员').\ map(lambda x: (x[0][:2], 1)).\ reduceByKey(lambda a, b: a + b).\ sortBy(lambda x: x[1], ascending=False, numPartitions=1).\ take(1) print("需求3的结果:", result3) # TODO 需求4: 将数据转换为JSON格式,写出到文件中 # 4.1 转换为JSON格式的RDD # 4.2 写出为文件 file_rdd.map(lambda x: x.split("\t")).\ map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\ saveAsTextFile("hdfs://m1:8020/output/output_json")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。