赞
踩
注意:运行模式不同, 能执行命令也有所差异, 像打jar包能运行的命令, 在spark-shell模式下不一定可以。
1) 无需打jar包
2)在本地就可测试逻辑正确性。
3)在本地就可运行的简单任务。
4)在本地就可测试的函数用法。
1) 无需打jar包
2)spark-shell不仅可以支持sql查询,还可以执行RDD/dataframe/dataset操作。
3)需在真实集群上运行的简单任务
4)需在真实集群上测试函数的用法.
1) 直接启动
- num_executors=20
- let parallelism=num_executors*4
- spark-shell \
- --name ShyTestError \
- --master yarn \
- --deploy-mode client \
- --num-executors ${num_executors} \
- --executor-memory 12G \
- --executor-cores 2 \
- --driver-memory 1G \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=50000 \
- --conf spark.speculation.quantile=0.95 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.sql.broadcastTimeout=-1 \
- --conf spark.default.parallelism=${parallelism} \
- --conf spark.sql.shuffle.partitions=${parallelism}
2) 脚本启动
- #!/usr/bin/env bash
-
-
- #---------------------------------------- 1 获取穿参 ---------------------------------------
- # 运行spark-sql, 并指定资源数
- # sh spark_shell_run.sh num_executors
- num_executors=${1}
- let parallelism=num_executors*4
-
-
- #----------------------------------------- 2 运行spark-sql ----------------------------------
- spark-shell \
- --name shy_spark_shell \
- --queue default \
- --master yarn \
- --deploy-mode client \
- --num-executors ${num_executors} \
- --executor-memory 12G \
- --executor-cores 2 \
- --driver-memory 1G \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=20000 \
- --conf spark.speculation.quantile=0.95 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.sql.broadcastTimeout=-1 \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.default.parallelism=${parallelism} \
- --conf spark.sql.shuffle.partitions=${parallelism}
1) 无需打jar包
2) spark-sql客户端类似于hive客户端,仅支持sql查询
1) 直接启动
- num_executors=20
- let parallelism=num_executors*4
- spark-sql \
- -S \
- --name ShyTestError \
- --master yarn \
- --deploy-mode client \
- --num-executors ${num_executors} \
- --executor-memory 12G \
- --executor-cores 2 \
- --driver-memory 1G \
- --hiveconf hive.cli.print.header=true \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=50000 \
- --conf spark.speculation.quantile=0.95 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.sql.broadcastTimeout=-1 \
- --conf spark.default.parallelism=${parallelism} \
- --conf spark.sql.shuffle.partitions=${parallelism}
2) 脚本启动
- #!/usr/bin/env bash
-
-
- #---------------------------------------- 1 获取穿参 ---------------------------------------
- # 运行spark-sql, 并指定资源数
- # sh spark_sql_run.sh num_executors
- num_executors=${1}
- let parallelism=num_executors*4
-
-
- #----------------------------------------- 2 运行spark-sql ----------------------------------
- spark-sql \
- -S \
- --name shy_spark_sql \
- --queue default \
- --master yarn \
- --deploy-mode client \
- --num-executors ${num_executors} \
- --executor-memory 12G \
- --executor-cores 2 \
- --driver-memory 1G \
- --hiveconf hive.cli.print.header=true \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=20000 \
- --conf spark.speculation.quantile=0.95 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.sql.broadcastTimeout=-1 \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.default.parallelism=${parallelism} \
- --conf spark.sql.shuffle.partitions=${parallelism}
1)无需打jar包,传入sql,查询结果打印到本地文件(结果不宜过大)
- #!/usr/bin/env bash
-
-
- #---------------------------------------- 1 获取穿参 ---------------------------------------
- # 运行spark-sql, 并指定资源数
- # sh spark_sql_run.sh num_executors
- num_executors=${1}
- let parallelism=num_executors*4
-
-
- #----------------------------------------- 2 运行spark-sql ----------------------------------
- spark-sql \
- -S \
- --name shy_spark_sql \
- --queue default \
- --master yarn \
- --deploy-mode client \
- --num-executors ${num_executors} \
- --executor-memory 12G \
- --executor-cores 2 \
- --driver-memory 1G \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=20000 \
- --conf spark.speculation.quantile=0.95 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.sql.broadcastTimeout=-1 \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.default.parallelism=${parallelism} \
- --conf spark.sql.shuffle.partitions=${parallelism} \
- -S -e "select dt, brand from tab_name " > spark-sql.out
该模式下,driver端必定运行在提交应用的服务器上,增加该台服务器压力。该模式下运行日志详细,常用于测试环境。
1) 必须打jar包
2) 须在集群模式测试逻辑正确性
该模式下,driver端不一定运行在提交应用的服务器上,根据集群的资源情况可能被分配到其他服务器上,常用生产环境。
1) 必须打jar包
1) 脚本正式上线的运行模式。
1. spark-sql 参数
-
- -- 环境参数
- set spark.submit.deployMode=cluster;
- set spark.driver.cores=1;
- set spark.driver.memory=2G;
- set spark.driver.memoryOverhead=512;
- set spark.executor.instances=10;
- set spark.executor.cores=2;
- set spark.executor.memory=12G;
- set spark.executor.memoryOverhead=2G;
- set spark.default.parallelism=40;
- set spark.sql.shuffle.partitions=40;
- set spark.dynamicAllocation.enabled=false;
- set spark.speculation=true;
- set spark.speculation.interval=50000;
- set spark.speculation.quantile=0.95;
- set spark.speculation.multiplier=1.5;
- set spark.sql.broadcastTimeout=-1;
-
- -- 应用参数
- set spark.sql.storeAssignmentPolicy=LEGACY;
- set spark.sql.hive.convertMetastoreParquet=true;
- set spark.sql.session.timeZone=UTC;
2. spark-submit参数
- spark-submit \
- --queue default \
- --master yarn \
- --deploy-mode cluster \
- --driver-cores 2 \
- --driver-memory 2G \
- --num-executors 2 \
- --executor-cores 2 \
- --executor-memory 8G \
- --conf spark.driver.memoryOverhead=512 \
- --conf spark.executor.memoryOverhead=2G \
- --conf spark.dynamicAllocation.enabled=false \
- --conf spark.sql.hive.filesourcePartitionFileCacheSize=1048576000 \
- --conf spark.sql.files.ignoreCorruptFiles=true \
- --conf spark.sql.files.ignoreMissingFiles=true \
- --conf spark.speculation=true \
- --conf spark.speculation.interval=30000 \
- --conf spark.speculation.quantile=0.9 \
- --conf spark.speculation.multiplier=1.5 \
- --conf spark.executor.memoryOverhead=8G \
- --conf spark.sql.sources.parallelPartitionDiscovery.parallelism=240 \
- --conf spark.default.parallelism=160 \
- --conf spark.sql.shuffle.partitions=160 \
- --conf spark.sql.file.out.partitions=80
1. 上面的语句用时约为下面语句的0.25
- val result = df.map(x=>(x+10)*2*3*5)
-
- val result2 = df.map(_ + 10).map(_ * 2).map(_ * 3).map(_ * 5)
2. Dataframe添加值为null的一列
3. Case class 类要定义在使用方法之外。
4. 列的合并与拆分
1) 列的合并与拆分参考01
2) df.map(_.mkString(",")) 所有列组装成一列,中间用逗号分隔。
- val arr = Array((1, 11), (2, 22), (3, 33))
- val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
- df.map(_.mkString("-")).show()
- +-----+
- |value|
- +-----+
- | 1-11|
- | 2-22|
- | 3-33|
- +-----+
-
- 3) array(cols: Column*)多列合并为array,cols必须为同类型
- val arr = Array((1, 11), (2, 22))
- val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
- df.select(array("id","age").as("arr")).show
- +-------+
- | arr|
- +-------+
- |[1, 11]|
- |[2, 22]|
- +-------+
4) map(cols: Column*)
将多列组织为map,输入列必须为(key,value)形式,各列的key/value分别为同一类型。
val arr = Array((1, 11), (2, 22))
val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
df.select(map($"id", $"age").as("map")).show
+---------+
| map|
+---------+
|[1 -> 11]|
|[2 -> 22]|
+---------+
5. 行列转换
6. 随机把dataframe分成n组
val Array(a, b) = df.randomSplit(Array(0.5, 0.5))
a.show()
b.show()
7. 数据动态分区写出到不同目录(根据某一列写出到不同目录)
df.write.mode( SaveMode.Append ).option( "compression", "gzip" ).partitionBy( "etype", "sv" ).parquet( out_path )
1 spark-sql hints
Hints give users a way to suggest how Spark SQL to use specific approaches to generate its execution plan.
Hints - Spark 3.1.2 Documentation
- SELECT /*+ COALESCE(3) */ * FROM t;
- SELECT /*+ REPARTITION(3) */ * FROM t;
- SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
- SELECT /*+ BROADCAST(t1), BROADCAST(t1) */ * FROM t;
参考-03 参数调整
参考-04 开发、资源、倾斜、shuffle
join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。
大表(随机值(0-9)),小表(扩容10倍(0-9))---》保证随机值在扩容值范围
1)Spark-sql执行过程
SparkSQL的执行和优化过程_是谁注册了我的2052的博客-CSDN博客
浅谈Spark SQL语句解析与基于规则优化(RBO) - 简书
2)优化过程参考
记一次Spark SQL的优化_软件开发随心记的博客-CSDN博客_sparksql 慢
问题定位、distribute by、cluster by、 broadcast join
避免使用in和not in
1)控制并行度
没有shuffle操作时,自己手动指定;有shuffle操作时, 默认为设定的shuffle.partition,可以自己指定。
2)设置合适的数据类型
如何确定数据是否偏移?
看文件输出大小是否均匀;
1 寻找dataframe的是否有某些功能
df. 操作查看api是否有相应的函数.
2 如何查看执行计划
3 查看spark-sql内置函数源码
import org.apache.spark.sql.functions._
4 获取spark的配置参数
val partition_num = spark.conf.get( "spark.sql.shuffle.partitions" ).toInt
5. 查看数据的缓存信息
spark-history中无法查看到storage的信息, 程序运行中时要从yarn中的ApplicationMaster入口进入可以查看到其信息.
6. 垂直打印
spark.sql("select name, age from tbl_name").show(number:Int, truncate:Int, vertical:Boolean)
1 spark3新特性
https://zhuanlan.zhihu.com/p/370197693
https://www.zhihu.com/question/402306453
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。