赞
踩
参数对spark所有指令都有效
Spark指令参数
# 表示引用运行的模式,要么是本地local要么是集群(Standalone、YARN、Mesos)了 --master MASTER_URL # 本地模式∶local[2] 数字表示可以使用到本地的cpu核心数, loacl[*] *表示自动判断 # Standalone集群∶spark∶//xxx∶7077,yyy∶7077 # YARN 集群∶ yarn # 表示的是应用运行的名称,通常在应用开发的时候指定 --name NAME # 表示应用运行时指定的某些参数配置,http∶//spark.apache.org/docs/2.2.0/configuration.html # 当value中的值有空格组成的时候,使用双引号将key=value引起来 # 可以不用在bashrc写配置可以通过conf配置,每次运行都要指定很麻烦 --conf "PROP=VALUE" # 第一种方式∶属性的值中没有空格 --conf spark.eventLog.enabled=false # 第二种方式∶属性的值中有空格,将属性和值统一使用双引号引起来 --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimestamps" # Driver相关配置 对driver一般不用配置 # 指定Driver Program JVM进程内存大小,默认值为1g --driver-memory MEM # 表示Driver 运行CLASS PATH路径,使用不多 --driver-class-path # Spark standalone with cluster deploy mode∶运行在standalone 中cluster Deploy Mode 默认值为1 cpu核心数量 # 运行在YARN in cluster mode,默认值是1 --driver-cores NUM # 运行在standalone的 中cluster Deploy Mode下,表示当Driver运行异常失败,可以自己重启 --supervise # Executor运行所需内存大小 --executor-memory MEM # Execturor 运行的CPU Cores,默认的情况下,在Standalone集群上为worker节点所有可用的Cpu Cores,在YARN集群下为1 --executor-cores NUM # 表示运行在Standalone集群下,所有Executor的CPU Cores,结合--executor-cores计算出Executor个数 --total-executor-cores # 表示在YARN集群下,Executor的个数,默认值为2 --num-executors # 表示Drive Program运行的地方,也叫做应用部署模式,默认值为client,通常在生产环境中使用cluster --deploy-mode DEPLOY_MODE
saprk指令使用参数举例
pyspark --master yarn --name yarn_demo --conf "PYSPARK_PYTHON=/export/server/anaconda3/bin/python3"
saprk脚本式使用参数举例
from pyspark import SparkContext
from pyspark import SparkConf
#指定driver和executor的参数(这里指定没用,只能在spark-submit中指定才会生效)
#conf = SparkConf().set('driver-memory','2g').set('num-executors','3')
sc = SparkContext(master='yarn',appName='yarn_demo')
rdd1 = sc.parallelize([1,2,3,4])
res = rdd1.reduce(lambda a,b:a+b)
print(res)
spark-submit指令
实际生产中使用的提交spark计算任务的方式
1,Spark-submit --参数名 参数值.py文件
2,–deploy-mode 只能和yarn模式一起使用
两种模式cluster ,driver进程由yarn服务选择资源充足的服务器创建,生成环境中
或者client,driver进程中由提交服务器创建,测试环境中
命令:
spark-submit --master yarn --name yarn_demo --deploy-mode cluster 执行文件.py
概念:弹性分布式数据集合,是Spark中最基本的数据抽象,代表一个不可变可分区,里面的元素可并行计算的集合
RDD是一种弹性分布式数据集合, 是spark中最基本的数据类型。
弹性:可以对海量数据根据需求分成多份(分区),每一份数据会由对应的task线程执行计算
分布式:使用多台服务器的计算资源
数据集合:规定了spark中数据的形式,类似于python中的列表[]
RDD可以看作是Spark的一个对象,它本身运行于内存中
分区:可以将计算的海量数据分成多份,需要分成多少个分区可以通过方法指定
每个分区都可以对应一个task线程执行计算
只读:
rdd中的数据不能直接修改,需要同通过方法计算后得到一个新的rdd
rdd本身存储的数据只能读取
依赖:
rdd之间是有以来关系的
新的rdd是通过旧的rdd计算得到
缓存:
将中间计算得到的rdd进行缓存操作,
保存在内存中或磁盘中
容错机制
缓存rdd随着spark应用程序执行结束后自动清空
checkpoint:监测点
将中间计算的到的rdd进行checkpoint操作
保存在HDFS中,永久持久化
from pyspark import SparkContext sc= SparkContext() str1 = 'sdf' int1 = 234 #不能转换 float1 = 34.34 #不能转换 boo11 = True #不能转换 list1 = [1,23,4,5,5] dict1 = {'asdf':'adsff','et':10} set1 = {1,3,4,6,67} tuple1 = (1,3,5) #字符串属于可迭代对象,不可迭代的对象无法转为rdd数据 rdd_str1 = sc.parallelize(str1) print(rdd_str1) res = rdd_str1.collect() print(res) #直接遍历字典获取到的时key值 rdd_str2 = sc.parallelize(dict1.values()) print(rdd_str2) res = rdd_str2.collect() print(res)
from pyspark import SparkContext # 创建sc对象 sc = SparkContext() # 读取文件数据转换成rdd textFile() """ 默认读取HDFS上的文件数据 hdfs://ip:8020(可以省略) port为8020->服务端的端口号 也可以读取服务器本地上的文件数据 file:// 路径可以是文件路径,也可以是目录路径(读取目录下的所有文件生成一个rdd对象) 文件中的一行数据就是rdd中一个元素 """ # 读取hdfs上的文件 # 指定文件路径 # 省略 hdfs://node1:8020 rdd1 = sc.textFile('/test/words.txt') res1 = rdd1.collect() print(res1) # 指定目录路径 rdd2 = sc.textFile('hdfs://node1:8020/test') res2 = rdd2.collect() print(res2) # 读取服务器上本地文件 # file:// # 指定文件路径 rdd3 = sc.textFile('file:///root/test/words.txt') res3 = rdd3.collect() print(res3) # 指定目录路径 rdd4= sc.textFile('file:///root/test') res4 = rdd4.collect() print(res4)
from pyspark import SparkContext #分区数等于task线程数 sc = SparkContext() #查看分区数 print(sc.defaultParallelism) #结果为2 #调整列表分区数通过修改numSlices rdd1 = sc.parallelize([1,2,3,4],numSlices=3) #借助glom()方法查看rdd的分区数据 res1 = rdd1.glom().collect() res2 = rdd1.collect() print(res1) print(res2) #读取文件数据时进行分区数指定通过修改defaultParallelism #参数说明minPartitions:最小分区数,不指定是从defaultParallelism和2中取最小值 rdd2 = sc.textFile('file:///root/test/words.txt',minPartitions=4) # #借助glom()方法查看rdd的分区数据 res3 = rdd2.glom().collect() #打印未查看分区数的数据 res2 = rdd2.collect() print(res3) print(res2) # [['hadoop,flink,spark,hive'], ['hive,spark,python,java'], ['python,itcast,itheima']] # ['hadoop,flink,spark,hive', 'hive,spark,python,java', 'python,itcast,itheima']
一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费
使用wholeTextFiles方法解决
# rdd的分区数指定 from pyspark import SparkContext # 生成SparkContext类对象 sc = SparkContext() # 文件数据指定分区数 ,读取目录下的多个小文件 rdd = sc.textFile('/data') # glom()按照分区查看数据 res = rdd.glom().collect() # 每个小文件的数据会单独存放一个分区 # 一个分区会对应一个task执行计算 # 当目录下小文件数据较多时,会产生很多task。task较多时会抢占计算资源影响计算速度 # 10条数据文件100个 1万条数据文件10个 # 将小文件合并 一个分区数据 1000条数据在一个分区,对应一个task线程 print(res) print(len(res)) # wholeTextFiles读取目录中的多个小文件数据 rdd2= sc.wholeTextFiles('/data') res = rdd2.glom().collect() print(res) print(len(res))
from pyspark import SparkContext sc = SparkContext() rdd1 = sc.parallelize([1,2,3,4,5,5,67]) print(rdd1.glom().collect()) #调整rdd的分区数,可以增加和减少分区数 #调整后的数据一定会发生shuffle过程 #增加分区数 rdd2 = rdd1.repartition(numPartitions=5) print(rdd2.glom().collect()) #减少分区数 rdd3 = rdd2.repartition(numPartitions=2) print(rdd3.glom().collect()) """" 调整rdd的分区数时,如果数据发生shuffle,需要使用repartition方法,效率低 减少rdd的分区数时,如果数据不发生shuffle,需要使用coalesce方法,效率高 """ #coalesce()修改分区数 #减少分区数,不能增加 #默认不发生shuffle rdd4 = rdd1.coalesce(numPartitions=1) print(rdd4.glom().collect()) #repartition()修改rdd分区数 此方法等同于coalesce(numPartitions,shuffle=True)的操作
rdd算子就是对rdd对象进行操作的方法1
rdd算子分类
transformation算子->转化算子 得到一个新的rdd对象,定义了一个计算任务
action算子->执行算子 得到最终的结果,执行了定义好的计算任务
5.2常用的transformation算子(方法)
map()
转化算子,对旧rdd数据1进行转化操作,得到新的rdd
此方法不会改变新rdd的数据结构
rdd的每个元素经过传入函数转化后,将函数的返回值保存到列表中
""" rdd.map(func)方法 rdd对象经过func函数处理得到新的rdd,不会改变新rdd的数据结构 """ from pyspark import SparkContext sc = SparkContext() rdd1 = sc.parallelize([1,2,34,4]) #定义函数 def func1(a): print(a) return str(a) def func2(b): print(b) return [b] #使用map算子对rdd1进行处理 rdd2 = rdd1.map(func1) print(rdd2.collect()) print(rdd2) rdd3 = rdd1.map(func2) print(rdd3) print(rdd3.collect()) #map可以接受匿名函数(以后计算常用) #匿名函数最主要的是清楚x这个参数里面的内容 rdd4 = rdd1.map(lambda x:[x]) print(rdd4.collect())
转化算子,对旧rdd数据进行转化操作,得到新的rdd,将新的rdd扁平化(降维)
此方法会改变新rdd的数据结构例如[[1,2],[3,4]]会变成[1,2,3,4]
将新的rdd中的每个元素进行拆解保存到列表中
主要处理的是二位嵌套列表数据
from pyspark import SparkContext """ flatMap适用于处理嵌套的rdd对象[[],[]] 例:[[1,2,34,4],[1,23,54]] 返回[1, 2, 34, 4, 1, 23, 54] """ sc = SparkContext() rdd1 = sc.parallelize([[1,2,34,4],[1,23,54]]) def func1(x): return x rdd_map1 = rdd1.map(func1) print(rdd_map1.collect()) rdd_ma2 = rdd1.map(lambda x:x.append(10)) print(rdd_ma2.collect()) def func2(x): print(x) x.append(10) print(x) return x rdd_map3 = rdd1.map(func2) print(rdd_map3.collect()) #调用flatMap() rdd_flatMap1 = rdd1.flatMap(func1) print(rdd_flatMap1.collect()) rdd_flatMap2 = rdd1.flatMap(func2) print(rdd_flatMap2.collect()) #匿名函数,结果和func1一样 rdd_flatMap3 = rdd1.flatMap(lambda x:x) print(rdd_flatMap3.collect())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。