当前位置:   article > 正文

sprk使用手册_sparksql用户使用手册

sparksql用户使用手册

一、Spark运行模式选择

注意:运行模式不同, 能执行命令也有所差异, 像打jar包能运行的命令, 在spark-shell模式下不一定可以。

1.1 idea 的local模式

1.  适用场合

1) 无需打jar包

2)在本地就可测试逻辑正确性。

3)在本地就可运行的简单任务。

4)在本地就可测试的函数用法。

1.2 spark-shell模式

1. 适用场合

1) 无需打jar包

2)spark-shell不仅可以支持sql查询,还可以执行RDD/dataframe/dataset操作。

3)需在真实集群上运行的简单任务

4)需在真实集群上测试函数的用法.
 

2. 启动客户端

1) 直接启动

  1. num_executors=20
  2. let parallelism=num_executors*4
  3. spark-shell \
  4. --name ShyTestError \
  5. --master yarn \
  6. --deploy-mode client \
  7. --num-executors ${num_executors} \
  8. --executor-memory 12G \
  9. --executor-cores 2 \
  10. --driver-memory 1G \
  11. --conf spark.dynamicAllocation.enabled=false \
  12. --conf spark.executor.memoryOverhead=2G \
  13. --conf spark.sql.session.timeZone=UTC \
  14. --conf spark.speculation=true \
  15. --conf spark.speculation.interval=50000 \
  16. --conf spark.speculation.quantile=0.95 \
  17. --conf spark.speculation.multiplier=1.5 \
  18. --conf spark.sql.broadcastTimeout=-1 \
  19. --conf spark.default.parallelism=${parallelism} \
  20. --conf spark.sql.shuffle.partitions=${parallelism}

2) 脚本启动

  1. #!/usr/bin/env bash
  2. #---------------------------------------- 1 获取穿参 ---------------------------------------
  3. # 运行spark-sql, 并指定资源数
  4. # sh spark_shell_run.sh num_executors
  5. num_executors=${1}
  6. let parallelism=num_executors*4
  7. #----------------------------------------- 2 运行spark-sql ----------------------------------
  8. spark-shell \
  9. --name shy_spark_shell \
  10. --queue default \
  11. --master yarn \
  12. --deploy-mode client \
  13. --num-executors ${num_executors} \
  14. --executor-memory 12G \
  15. --executor-cores 2 \
  16. --driver-memory 1G \
  17. --conf spark.dynamicAllocation.enabled=false \
  18. --conf spark.executor.memoryOverhead=2G \
  19. --conf spark.speculation=true \
  20. --conf spark.speculation.interval=20000 \
  21. --conf spark.speculation.quantile=0.95 \
  22. --conf spark.speculation.multiplier=1.5 \
  23. --conf spark.sql.broadcastTimeout=-1 \
  24. --conf spark.sql.session.timeZone=UTC \
  25. --conf spark.default.parallelism=${parallelism} \
  26. --conf spark.sql.shuffle.partitions=${parallelism}

1.3 Spark-sql命令行

1.  适用场合

1) 无需打jar包

2) spark-sql客户端类似于hive客户端,仅支持sql查询

2. 启动客户端

 1) 直接启动

  1. num_executors=20
  2. let parallelism=num_executors*4
  3. spark-sql \
  4. -S \
  5. --name ShyTestError \
  6. --master yarn \
  7. --deploy-mode client \
  8. --num-executors ${num_executors} \
  9. --executor-memory 12G \
  10. --executor-cores 2 \
  11. --driver-memory 1G \
  12. --hiveconf hive.cli.print.header=true \
  13. --conf spark.dynamicAllocation.enabled=false \
  14. --conf spark.executor.memoryOverhead=2G \
  15. --conf spark.sql.session.timeZone=UTC \
  16. --conf spark.speculation=true \
  17. --conf spark.speculation.interval=50000 \
  18. --conf spark.speculation.quantile=0.95 \
  19. --conf spark.speculation.multiplier=1.5 \
  20. --conf spark.sql.broadcastTimeout=-1 \
  21. --conf spark.default.parallelism=${parallelism} \
  22. --conf spark.sql.shuffle.partitions=${parallelism}

2) 脚本启动

  1. #!/usr/bin/env bash
  2. #---------------------------------------- 1 获取穿参 ---------------------------------------
  3. # 运行spark-sql, 并指定资源数
  4. # sh spark_sql_run.sh num_executors
  5. num_executors=${1}
  6. let parallelism=num_executors*4
  7. #----------------------------------------- 2 运行spark-sql ----------------------------------
  8. spark-sql \
  9. -S \
  10. --name shy_spark_sql \
  11. --queue default \
  12. --master yarn \
  13. --deploy-mode client \
  14. --num-executors ${num_executors} \
  15. --executor-memory 12G \
  16. --executor-cores 2 \
  17. --driver-memory 1G \
  18. --hiveconf hive.cli.print.header=true \
  19. --conf spark.dynamicAllocation.enabled=false \
  20. --conf spark.executor.memoryOverhead=2G \
  21. --conf spark.speculation=true \
  22. --conf spark.speculation.interval=20000 \
  23. --conf spark.speculation.quantile=0.95 \
  24. --conf spark.speculation.multiplier=1.5 \
  25. --conf spark.sql.broadcastTimeout=-1 \
  26. --conf spark.sql.session.timeZone=UTC \
  27. --conf spark.default.parallelism=${parallelism} \
  28. --conf spark.sql.shuffle.partitions=${parallelism}

1.4 spark-sql -e 模式

1. 适用场景

1)无需打jar包,传入sql,查询结果打印到本地文件(结果不宜过大)

2. 启动客户端

  1. #!/usr/bin/env bash
  2. #---------------------------------------- 1 获取穿参 ---------------------------------------
  3. # 运行spark-sql, 并指定资源数
  4. # sh spark_sql_run.sh num_executors
  5. num_executors=${1}
  6. let parallelism=num_executors*4
  7. #----------------------------------------- 2 运行spark-sql ----------------------------------
  8. spark-sql \
  9. -S \
  10. --name shy_spark_sql \
  11. --queue default \
  12. --master yarn \
  13. --deploy-mode client \
  14. --num-executors ${num_executors} \
  15. --executor-memory 12G \
  16. --executor-cores 2 \
  17. --driver-memory 1G \
  18. --conf spark.dynamicAllocation.enabled=false \
  19. --conf spark.executor.memoryOverhead=2G \
  20. --conf spark.speculation=true \
  21. --conf spark.speculation.interval=20000 \
  22. --conf spark.speculation.quantile=0.95 \
  23. --conf spark.speculation.multiplier=1.5 \
  24. --conf spark.sql.broadcastTimeout=-1 \
  25. --conf spark.sql.session.timeZone=UTC \
  26. --conf spark.default.parallelism=${parallelism} \
  27. --conf spark.sql.shuffle.partitions=${parallelism} \
  28. -S -e "select dt, brand  from tab_name " > spark-sql.out

3.  相关参考

hive(spark-sql) -e -f -d以及传参数

1.5 集群client模式

该模式下,driver端必定运行在提交应用的服务器上,增加该台服务器压力。该模式下运行日志详细,常用于测试环境。

1. 适用场合

1) 必须打jar包

2) 须在集群模式测试逻辑正确性

1.6 集群cluster模式

该模式下,driver端不一定运行在提交应用的服务器上,根据集群的资源情况可能被分配到其他服务器上,常用生产环境。

1.  适用场合

1) 必须打jar包

1)    脚本正式上线的运行模式。

1.7 spark启动参数

1. spark-sql 参数

  1. -- 环境参数
  2. set spark.submit.deployMode=cluster;
  3. set spark.driver.cores=1;
  4. set spark.driver.memory=2G;
  5. set spark.driver.memoryOverhead=512;
  6. set spark.executor.instances=10;
  7. set spark.executor.cores=2;
  8. set spark.executor.memory=12G;
  9. set spark.executor.memoryOverhead=2G;
  10. set spark.default.parallelism=40;
  11. set spark.sql.shuffle.partitions=40;
  12. set spark.dynamicAllocation.enabled=false;
  13. set spark.speculation=true;
  14. set spark.speculation.interval=50000;
  15. set spark.speculation.quantile=0.95;
  16. set spark.speculation.multiplier=1.5;
  17. set spark.sql.broadcastTimeout=-1;
  18. -- 应用参数
  19. set spark.sql.storeAssignmentPolicy=LEGACY;
  20. set spark.sql.hive.convertMetastoreParquet=true;
  21. set spark.sql.session.timeZone=UTC;

2. spark-submit参数

  1. spark-submit \
  2. --queue default \
  3. --master yarn \
  4. --deploy-mode cluster \
  5. --driver-cores 2 \
  6. --driver-memory 2G \
  7. --num-executors 2 \
  8. --executor-cores 2 \
  9. --executor-memory 8G \
  10. --conf spark.driver.memoryOverhead=512 \
  11. --conf spark.executor.memoryOverhead=2G \
  12. --conf spark.dynamicAllocation.enabled=false \
  13. --conf spark.sql.hive.filesourcePartitionFileCacheSize=1048576000 \
  14. --conf spark.sql.files.ignoreCorruptFiles=true \
  15. --conf spark.sql.files.ignoreMissingFiles=true \
  16. --conf spark.speculation=true \
  17. --conf spark.speculation.interval=30000 \
  18. --conf spark.speculation.quantile=0.9 \
  19. --conf spark.speculation.multiplier=1.5 \
  20. --conf spark.executor.memoryOverhead=8G \
  21. --conf spark.sql.sources.parallelPartitionDiscovery.parallelism=240 \
  22. --conf spark.default.parallelism=160 \
  23. --conf spark.sql.shuffle.partitions=160 \
  24. --conf spark.sql.file.out.partitions=80

二、Spark-core使用

1. 上面的语句用时约为下面语句的0.25

  1. val result = df.map(x=>(x+10)*2*3*5)
  2. val result2 = df.map(_ + 10).map(_ * 2).map(_ * 3).map(_ * 5)

2. Dataframe添加值为null的一列

3. Case class 类要定义在使用方法之外。

4.  列的合并与拆分

1) 列的合并与拆分参考01

2) df.map(_.mkString(","))  所有列组装成一列,中间用逗号分隔。

  1. val arr = Array((1, 11), (2, 22), (3, 33))
  2. val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
  3. df.map(_.mkString("-")).show()
  4. +-----+
  5. |value|
  6. +-----+
  7. | 1-11|
  8. | 2-22|
  9. | 3-33|
  10. +-----+
  11. 3) array(cols: Column*)多列合并为array,cols必须为同类型
  1. val arr = Array((1, 11), (2, 22))
  2. val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
  3. df.select(array("id","age").as("arr")).show
  4. +-------+
  5. |    arr|
  6. +-------+
  7. |[1, 11]|
  8. |[2, 22]|
  9. +-------+

4) map(cols: Column*)

将多列组织为map,输入列必须为(key,value)形式,各列的key/value分别为同一类型。

val arr = Array((1, 11), (2, 22))
val df = spark.sparkContext.makeRDD(arr).toDF("id", "age")
df.select(map($"id", $"age").as("map")).show
+---------+
|      map|
+---------+
|[1 -> 11]|
|[2 -> 22]|
+---------+

5. 行列转换

行列转换参考01

行列转换参考02

 6. 随机把dataframe分成n组 

val Array(a, b) = df.randomSplit(Array(0.5, 0.5))
a.show()
b.show()

7.  数据动态分区写出到不同目录(根据某一列写出到不同目录)

df.write.mode( SaveMode.Append ).option( "compression", "gzip" ).partitionBy( "etype", "sv" ).parquet( out_path )

三、spark-sql

用途描述链接
语法参考https://www.gairuo.com/p/spark-sql-tutorial

1 spark-sql hints

Hints give users a way to suggest how Spark SQL to use specific approaches to generate its execution plan.

Hints - Spark 3.1.2 Documentation

  1. SELECT /*+ COALESCE(3) */ * FROM t;
  2. SELECT /*+ REPARTITION(3) */ * FROM t;
  3. SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
  4. SELECT /*+ BROADCAST(t1), BROADCAST(t1) */ * FROM t;

4 Spark优化

4.1 参数设置参考

参考-01

参考-02

参考-03 参数调整

参考-04 开发、资源、倾斜、shuffle

Spark性能优化指南——高级篇 - 美团技术团队 混合

4.2 Spark性能调优

链接-01

4.3 并行度与分区设置

4.4 Join优化

join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。

大表(随机值(0-9)),小表(扩容10倍(0-9))---》保证随机值在扩容值范围

参考-01

优化-02

4.5 Spark-sql优化

1)Spark-sql执行过程

SparkSQL的执行和优化过程_是谁注册了我的2052的博客-CSDN博客

浅谈Spark SQL语句解析与基于规则优化(RBO) - 简书

2)优化过程参考

记一次Spark SQL的优化_软件开发随心记的博客-CSDN博客_sparksql 慢

问题定位、distribute by、cluster by、 broadcast join

Spark SQL 优化笔记 - 简书

避免使用in和not in

4.6 其他

1)控制并行度

没有shuffle操作时,自己手动指定;有shuffle操作时, 默认为设定的shuffle.partition,可以自己指定。

2)设置合适的数据类型

5 问题排查

如何确定数据是否偏移?

看文件输出大小是否均匀;

6 使用技巧

1 寻找dataframe的是否有某些功能

df. 操作查看api是否有相应的函数.

2 如何查看执行计划

参考链接

3 查看spark-sql内置函数源码

import org.apache.spark.sql.functions._

4 获取spark的配置参数

val partition_num = spark.conf.get( "spark.sql.shuffle.partitions" ).toInt

5. 查看数据的缓存信息

spark-history中无法查看到storage的信息, 程序运行中时要从yarn中的ApplicationMaster入口进入可以查看到其信息.

6. 垂直打印

spark.sql("select name, age from tbl_name").show(number:Int, truncate:Int, vertical:Boolean)

20 版本特性

1 spark3新特性

https://zhuanlan.zhihu.com/p/370197693

https://www.zhihu.com/question/402306453
 


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/653267
推荐阅读
相关标签
  

闽ICP备14008679号