赞
踩
目录
Spark是一个开源的大数据处理框架,它以其高速、易用性和对复杂分析的支持而闻名。以下是Spark的一些关键特点和组件:
Spark是一个强大的工具,适用于大数据处理和分析,无论是学术研究还是工业应用,都能找到其用武之地。
pip install pyspark
想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口环境。
pyspark的执行环境入口对象是:类SparkContext的类对象
- """
- 演示获取pyspark的执行环境入库对象:SparkContext
- 并通过SparkContext对象获取当前pyspark的版本
- """
- # 导包
- from pyspark import SparkConf,SparkContext
- # 创建SparkConf对象
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- # 基于SparkConf类对象创建SparkContext对象
- sc = SparkContext(conf=conf)
- # 打印pyspark的运行版本
- print(sc.version)
- # 停止SparkContext对象的运行(停止pyspark程序)
- sc.stop()
-
RDD(Resilient Distributed Datasets,弹性分布式数据集)是Apache Spark中的核心概念,它是一个不可变的分布式对象集合。
pyspark支持多种数据的输入,再输入完成后,都会得到一个RDD对象,pyspark针对数据的处理,都是以RDD对象作为载体,即:
pyspark支持通过SparkContext对象的parallelize成员方法,将
转换为pyspark的RDD对象
(字符串会被拆分出1个个的字符,存入RDD对象;字典仅有key会被存入RDD对象)
- from pyspark import SparkConf,SparkContext
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 通过parallelize方法将python对象加载到spark内,成为RDD对象
- add1 = sc.parallelize([1, 2, 3, 4, 5])
- add2 = sc.parallelize((1, 2, 3, 4, 5))
- add3 = sc.parallelize("abcdefg")
- add4 = sc.parallelize({1, 2, 3, 4, 5})
- add5 = sc.parallelize({"key1":"value1", "key2":"value2"})
-
- # 如果要查看RDD里面有什么内容,需要用collect()方法
- print(add1.collect())
- print(add2.collect())
- print(add3.collect())
- print(add4.collect())
- print(add5.collect())
-
- sc.stop()
结果如下:
通过读取文件转RDD对象
先准备好一个hello.txt文件,内容为:
- from pyspark import SparkConf,SparkContext
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
-
- # 通过textFile方法,读取文件按数据加载到spark内,成为RDD对象
- add = sc.textFile("D:/pydaima/8day速成python/shuju/hello.txt")
- print(add.collect())
-
- sc.stop()
运行结果:
功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD
语法:
-
- """
- 演示RDD的map方法的使用
- """
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 准备一个RDD
- add = sc.parallelize([1, 2, 3, 4, 5])
-
- # 通过map方法将全部数据都乘以10
- def func(data):
- return data * 10
-
- add2 = add.map(lambda x: x * 10)
- print(add2.collect())
-
- sc.stop()
功能:对add执行map操作,然后进行解除嵌套操作。
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 准备一个RDD
- add = sc.parallelize(["python student 588", "python 588 588", "process finished"])
-
- # 需求:将RDD数据里面的一个个单词提取出来
- add2 = add.flatMap(lambda x: x.split(' '))
- print(add2.collect())
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据的聚合操作。
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 准备一个RDD
- add = sc.parallelize([('男', 99), ('女', 99), ('女', 912), ('男', 99)])
-
- # 求男生和女生两个组的成绩之和
- add2 = add.reduceByKey(lambda a, b: a + b)
- print(add2.collect())
使用学习到的内容,完成:
先准备一个文件,文件内容如下:
- # 1.统计执行环境入口对象
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 2.读取数据文件
- add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
- # 3.取出全部单词
- word_add = add.flatMap(lambda x: x.split(' '))
- # 4.将所有单词都转换为二元元组,单词为key,value设置为1
- add2 = word_add.map(lambda word: (word, 1))
- # 5.分组并求和
- result_add = add2.reduceByKey(lambda a, b: a + b)
- # 6.打印输出结果
- print(result_add.collect())
运行结果:
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
-
- # 准备一个RDD
- rdd = sc.parallelize([1, 2, 3, 4, 5])
- # 对RDD的数据进行过滤
- rdd2 = rdd.filter(lambda num: num % 2 == 0)
- print(rdd2.collect())
运行示例:
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
- # 准备一个RDD
- rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9])
- # 对RDD的数据进行去重
- rdd2 = rdd.distinct()
- print(rdd2.collect())
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 2.读取数据文件
- add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
- # 3.取出全部单词
- word_add = add.flatMap(lambda x: x.split(' '))
- # 4.将所有单词都转换为二元元组,单词为key,value设置为1
- add2 = word_add.map(lambda word: (word, 1))
- # 5.分组并求和
- result_add = add2.reduceByKey(lambda a, b: a + b)
- # 以上使用综合案例1的代码进行补充
-
- # 对结果进行排序
- final_rdd = result_add.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
-
- print(final_rdd.collect())
对数据文件进行读取,将数据按照各个城市销售额排名,从大到小;全部城市有哪些商品类别在售卖;北京市有哪些商品类别在售卖。
- import json
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 需求1:城市销售额排名
- # 1.1 读取文件得到RDD
- file_rdd = sc.textFile("D:/pydaima/8day速成python/shuju/orders.txt")
- # 1.2 取出一个个json字符串
- json_str_rdd = file_rdd.flatMap(lambda x: x.split('|'))
- # 1.3 将一个个json字符串转为字典
- dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
- # 1.4 取出城市和销售额数据 (城市,销售额)
- city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
- # 1.5 按城市分组销售额聚合
- city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
- # 1.6 按照销售额聚合结果进行排序
- result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
- print("需求1的结果:", result1_rdd.collect())
-
- # 需求2:全部城市有哪些商品类别在售卖
- # 2.1 取出全部的商品类别, 并去重
- category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
- print("需求2的结果:", category_rdd.collect())
-
- # 需求3: 北京市有哪些商品类别在售卖
- # 3.1 过滤北京的数
- bj_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
- # 3.2 取出商品类别,并去重
- result3_rdd = bj_data_rdd.map(lambda x: x['category']).distinct()
- print("需求3的结果:", result3_rdd.collect())
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- sc = SparkContext(conf=conf)
-
- # 准备RDD
- rdd = sc.parallelize([1, 2, 3, 4, 5])
-
- # collect算子,输出为list对象
- rdd_list = rdd.collect()
- print(rdd_list)
- print(type(rdd_list))
-
- # reduce算子,输出RDD为list对象
- num = rdd.reduce(lambda a, b: a + b)
- print(num)
-
- # take算子,取出RDD的前N个元素,组成list返回
- take_list = rdd.take(3)
- print(f"前3个元素是:{take_list}")
-
- # count算子,统计RDD内有多少条数据,返回值为数字
- num_count = rdd.count()
- print(f"rdd内有{num_count}个元素")
-
- sc.stop()
使用saveAsTextFile需要配置相关环境,调用保存文件的算子,配置Hadoop依赖。
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
- os.environ['HADOOP-HOME'] = "D:/hadoop-py/hadoop-3.0.0"
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
- # 设置全局并型度为1
- conf.set("spark.default.parallelism", "1")
- sc = SparkContext(conf=conf)
-
- # 准备RDD1
- rdd1 = sc.parallelize([1, 2, 3, 4, 5])
-
- # 准备RDD2
- rdd2 = sc.parallelize([('hello', 3), ('Spark', 5), ('Hi', 7)])
-
- # 准备RDD3
- rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])
-
- # 输出到文件中
- rdd1.saveAsTextFile("D:/pydaima/8day速成python/shuju/output1.txt")
- rdd2.saveAsTextFile("D:/pydaima/8day速成python/shuju/output2.txt")
- rdd3.saveAsTextFile("D:/pydaima/8day速成python/shuju/output3.txt")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。