当前位置:   article > 正文

spark-submit 命令使用详解_spark-submit --master

spark-submit --master

spark-submit 用户打包 Spark 应用程序并部署到 Spark 支持的集群管理气上,命令语法如下:

spark-submit [options] <python file> [app arguments]

app arguments 是传递给应用程序的参数,常用的命令行参数如下所示:

  • –master: 设置主节点 URL 的参数。支持:

  • local: 本地机器。

  • spark://host:port:远程 Spark 单机集群。

  • yarn:yarn 集群

  • –deploy-mode:允许选择是否在本地(使用 client 选项)启动 Spark 驱动程序,或者在集群内(使用 cluster 选项)的其中一台工作机器上启动。默认值是 client。

  • –name:应用程序名称,也可在程序内设置。

  • –py-files:.py, .egg 或者 .zip 文件的逗号分隔列表,包括 Python 应用程序。这些文件将分发给每个执行节点。

  • –files:逗号分隔的文件列表,这些文件将分发给每个执行节点。

  • –conf:动态地改变应用程序的配置。

  • –driver-memory:指定应用程序在驱动节点上分配多少内存的参数,类似与 10000M, 2G。默认值是 1024M。

  • –executor-memory:指定每个执行节点上为应用程序分配的内存,默认 1G。

  • –num-executors:指定执行器节点数。

  • –help:展示帮助信息和退出。

以下均是在 yarn 集群提交的任务。

1、默认设置: 会将所有日志和系统输出结果输出到 spark-submit 的 client 上

  1. spark-submit --master yarn code1.py
  2. 1

code1.py

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName('Test_Code1').enableHiveSupport().getOrCreate()
  3. spark.sql("select count(*) from default.test_table").show()1234

2、设置 Executor 的日志级别,Executor 执行的细节(WARN 以下级别的日志)不会输出到 client 中

  1. spark-submit --master yarn code2.py
  2. 1

code2.py

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName('Test_Code1').enableHiveSupport().getOrCreate()
  3. sc = spark.sparkContext
  4. sc.setLogLevel("WARN")
  5. spark.sql("select count(*) from default.test_table").show()1234567

3、使用 cluster 模式

  1. spark-submit --master yarn --deploy-mode cluster code1.py
  2. 1

–deploy-mode 可选 cluster 或 client,cluster 模式下,在 spark-submit 的 client 服务器上不会输出日志和系统输出,仅输出如下语句。只能在 Hadoop 集群上才能看到执行细节和输出

  1. 2019-09-06 00:00:00 INFO Client:54 - Application report for application_1556516318747_25363 (state: RUNNING)
  2. 1

4、自定义依赖的模块或读取文件

  1. spark-submit --master yarn --files file1.txt --py-files code4.py code3.py
  2. 1

code3.py

  1. from code4 import code4func
  2. from pyspark.sql import SparkSession
  3. spark = SparkSession.builder.appName('Test_Code1').enableHiveSupport().getOrCreate()
  4. sc = spark.sparkContext
  5. sc.setLogLevel("WARN")
  6. table = code4func()withopen("file1.txt",'rt')as rf:
  7. db = rf.readline().strip()
  8. spark.sql("select count(*) from {}.{}".format(db, table)).show()123456789101112

code4.py

defcode4func():return"test_table"12

file1.txt

  1. default
  2. 1

自定义的 package 可以打包成 egg 文件上传(该部分代码参考 《PySpark 实战》P:178)。例如有一个自定义创建的 package:

  1. additionalCode/
  2. ├── setup.py
  3. └── utilities
  4. ├── __init__.py
  5. ├── base.py
  6. ├── converters
  7. │ ├── __init__.py
  8. │ ├── base.py
  9. │ └── distance.py
  10. └── geoCalc.py
  11. 12345678910

创建一个 egg 文件:

  1. python setup.py bdist_egg
  2. 1

生成了 dist 文件夹下的 PySparkUtilities-0.1.dev0-py3.6.egg 文件

提交作业:

  1. spark-submit --master yarn --py-files additionalCode/dist/PySparkUtilities-0.1.dev0-py3.6.egg calculatingGeoDistance.py
  2. 1

5、配置集群资源

当执行的 job 需要更多资源时,可以自定义配置使用的资源。

  1. spark-submit --master yarn --driver-memory 15g \
  2. --num-executors 10 --executor-cores 4 --executor-memory 15G \
  3. --conf "spark.executor.memoryOverhead=15G" \
  4. code1.py
  5. 1234

或在程序内设置

  1. spark-submit code5.py
  2. 1

code5.py

  1. import pyspark
  2. from pyspark.sql import SparkSession
  3. conf1 = pyspark.SparkConf().setAll([('spark.executor.memory','15g'),('spark.executor.memoryOverhead','16g'),('spark.executor.cores','4'),('spark.num.executors','10'),('spark.driver.memory','16g')])
  4. spark = SparkSession.builder.appName('Test_Code1').enableHiveSupport().config(conf=conf1).getOrCreate()
  5. spark.sql("select count(*) from default.test_table").show()12345678910111213

6、使用 Python 虚拟环境

当使用 cluster 或应用某些第三方包的时候,在 Executor 中会出现 ImportError 的错误,导致 job 执行失败,如下提交方式会报错:

  1. spark-submit --master yarn --deploy-mode cluster code6.py
  2. 1

报错信息:

  1. Traceback (most recent call last):
  2. File "code6.py", line 2, in <module>
  3. import numpy as np
  4. ImportError: No module named numpy
  5. 1234

这是由于节点中的 python 环境没有安装相应的依赖包,此时需要创建一个 python 虚拟环境并安装所有的依赖包。

创建虚拟环境 python-env,打包为 venv.zip:

  1. virtualenv python-env
  2. 1

venv.zip 部分目录结构如下所示:

  1. venv.zip
  2. └──python-env/
  3. ├── bin
  4. │ └── python
  5. ├── include
  6. ├── lib
  7. └── lib64
  8. 1234567

spark-submit 命令:

  1. spark-submit --master yarn --deploy-mode cluster \
  2. --archives ./venv.zip#env \
  3. --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=env/python-env/bin/python \
  4. code6.py
  5. 1234

code6.py

  1. from pyspark.sql import SparkSession
  2. import numpy as np
  3. spark = SparkSession.builder.appName('Test_Code1').enableHiveSupport().getOrCreate()
  4. sc = spark.sparkContext
  5. sc.setLogLevel("WARN")
  6. arr = np.array([1,2,3])print(arr)
  7. spark.sql("select count(*) from default.test_table").show()123456789101112
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/905794
推荐阅读
相关标签
  

闽ICP备14008679号