赞
踩
- from pyspark import SparkContext, SparkConf
- import os
- # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
- os.environ['SPARK_HOME'] = '/export/servers/spark'
- # PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
- # 当存在多个版本时,不指定很可能会导致出错
- # os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
- # os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
- if __name__ == '__main__':
- print('PySpark First Program')
- # TODO: 当应用运行在集群上的时候,MAIN函数就是Driver Program,必须创建SparkContext对象
- # 创建SparkConf对象,设置应用的配置信息,比如应用名称和应用运行模式
- conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
- # TODO: 构建SparkContext上下文实例对象,读取数据和调度Job执行
- sc = SparkContext(conf=conf)
- # 第一步、读取本地数据 封装到RDD集合,认为列表List
- wordsRDD = sc.textFile("file:///export/pyfolder1/pyspark-chapter01_3.8/data/word.txt")
- # 第二步、处理数据 调用RDD中函数,认为调用列表中的函数
- # a. 每行数据分割为单词
- flatMapRDD = wordsRDD.flatMap(lambda line: line.split(" "))
- # b. 转换为二元组,表示每个单词出现一次
- mapRDD = flatMapRDD.map(lambda x: (x, 1))
- # c. 按照Key分组聚合
- resultRDD = mapRDD.reduceByKey(lambda a, b: a + b)
- # 第三步、输出数据
- res_rdd_col2 = resultRDD.collect()
- # 输出到控制台
- for line in res_rdd_col2:
- print(line)
- # 输出到本地文件中
- resultRDD.saveAsTextFile("file:///export/pyfolder1/pyspark-chapter01_3.8/data/output1/")
- print('停止 PySpark SparkSession 对象')
- # 关闭SparkContext
- sc.stop()
分布式代码的分析
2是由executor运行的,1和3是由driver运行的,因此地址一定要可以共享访问的HDFS地址
当1结束后,序列化SparkContext对象会发送给各个Executor,每个Executor对象就拿到了sc对象。就可以读取文件来计算数据。代码是一份代码,但是执行是好多个Executor
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。