当前位置:   article > 正文

Spark 各种配置项_spark.sql.storeassignmentpolicy

spark.sql.storeassignmentpolicy
/bin/spark-shell --master yarn --deploy-mode client
/bin/spark-shell --master yarn --deploy-mode cluster
  • 1
  • 2

There are two deploy modes that can be used to launch Spark applications on YARN.
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

spark.yarn.am.cores	1	Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead.
spark.executor.instances	2	The number of executors for static allocation. With spark.dynamicAllocation.enabled, the initial set of executors will be at least this large.
  • 1
  • 2

YARN: The --num-executors option to the Spark YARN client controls how many executors it will allocate on the cluster (spark.executor.instances as configuration property),
while --executor-memory (spark.executor.memory configuration property)
and --executor-cores (spark.executor.cores configuration property) control the resources per executor.

spark.memory.offHeap.enabled	false	If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.
spark.memory.offHeap.size	0	The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.
  • 1
  • 2
spark.driver.memory	1g	Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). 
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.executor.memory	1g	Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g).
  • 1
  • 2
  • 3
spark.executor.cores	1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes.	The number of cores to use on each executor. In standalone and Mesos coarse-grained modes, for more detail, see this description.

  • 1
  • 2
deployMode

You can control the deploy mode of a Spark application using spark-submit’s --deploy-mode command-line option or spark.submit.deployMode Spark property.
spark.submit.deployMode (default: client) can be client or cluster.

Cluster deploy mode is not applicable to below

  • spark-shell
  • spark-sql
  • Spark Thrift server
memory

spark.memory.offHeap

spark.memory.offHeap.enabled	
false	
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.
  • 1
  • 2
  • 3

spark.memory.offHeap.size

spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true

spark.memory.offHeap.size
0
The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.
  • 1
  • 2
  • 3

spark.yarn.executor.memoryOverhead表示Executor自身JVM进程需要的内存开销,spark.memory.offHeap.size表示rdd计算执行和数据存储使用的offheap(默认计算和存储各占50%,由参数spark.memory.storageFraction控制

spark.executor.memoryOverhead	executorMemory * 0.10, with minimum of 384	The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
  • 1

hadoop yarn 的配置
yarn-site.xml

<property>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>236544</value>
  </property>

  • 1
  • 2
  • 3
  • 4
  • 5

这里配置了每个节点的内存上限,也可以声明其值稍高于实际物理内存

当 ExecutorMemory + MemoryOverhead <= maximum-allocation-mb
Executor 会申请成功
(这里跟 spark.memory.offHeap.size 无关)

time
spark.network.timeout	120s	Default timeout for all network interactions. This config will be used in place of spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout or spark.rpc.lookupTimeout if they are not configured.
  • 1
spark.executor.heartbeatInterval	10s	Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks. spark.executor.heartbeatInterval should be significantly less than spark.network.timeout
  • 1
GC

spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -Xms100g

]# ll /ssd/ssd-pcie/yarn/userlogs/application_1561253389276_0054/container_1561253389276_0054_01_000005
total 96
-rw-r--r-- 1 root root 90873 Jun 24 10:11 stderr
-rw-r--r-- 1 root root  4078 Jun 24 10:11 stdout

  • 1
  • 2
  • 3
  • 4
  • 5
-XX:+PrintGC
输出形式:[GC 118250K->113543K(130112K), 0.0094143 secs]
                [Full GC 121376K->10414K(130112K), 0.0650971 secs]

-XX:+PrintGCDetails
输出形式:[GC [DefNew: 8614K->781K(9088K), 0.0123035 secs] 118250K->113543K(130112K), 0.0124633 secs]
                [GC [DefNew: 8614K->8614K(9088K), 0.0000665 secs][Tenured: 112761K->10414K(121024K), 0.0433488 secs] 121376K->10414K(130112K), 0.0436268 secs]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
-Xms3550m:设置JVM促使内存为3550m。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。
  • 1
spark.eventLog
spark.eventLog.dir	file:///tmp/spark-events	Base directory in which Spark events are logged, if spark.eventLog.enabled is true. Within this base directory, Spark creates a sub-directory for each application, and logs the events specific to the application in this directory. Users may want to set this to a unified location like an HDFS directory so history files can be read by the history server.
spark.eventLog.enabled	false	Whether to log Spark events, useful for reconstructing the Web UI after the application has finished.
  • 1
  • 2
driver
spark.driver.cores	1	Number of cores to use for the driver process, only in cluster mode.
spark.driver.memory	1g	Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). 
Note: In client mode, this config must not be set through the SparkConf(代码里) directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
  • 1
  • 2
  • 3
Overriding configuration directory
To specify a different configuration directory other than the default “SPARK_HOME/conf”, you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.
  • 1
CBO
  val CBO_ENABLED =
    buildConf("spark.sql.cbo.enabled")
      .doc("Enables CBO for estimation of plan statistics when set true.")
      .version("2.2.0")
      .booleanConf
      .createWithDefault(false)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
强制类型转换错误可以用如下配置
  val STORE_ASSIGNMENT_POLICY =
    buildConf("spark.sql.storeAssignmentPolicy")
      .doc("When inserting a value into a column with different data type, Spark will perform " +
        "type coercion. Currently, we support 3 policies for the type coercion rules: ANSI, " +
        "legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. " +
        "In practice, the behavior is mostly the same as PostgreSQL. " +
        "It disallows certain unreasonable type conversions such as converting " +
        "`string` to `int` or `double` to `boolean`. " +
        "With legacy policy, Spark allows the type coercion as long as it is a valid `Cast`, " +
        "which is very loose. e.g. converting `string` to `int` or `double` to `boolean` is " +
        "allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. " +
        "With strict policy, Spark doesn't allow any possible precision loss or data truncation " +
        "in type coercion, e.g. converting `double` to `int` or `decimal` to `double` is " +
        "not allowed."
      )
      .version("3.0.0")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(StoreAssignmentPolicy.values.map(_.toString))
      .createWithDefault(StoreAssignmentPolicy.ANSI.toString)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
spark.sql.mapKeyDedupPolicy

In Spark version 2.4 and below, you can create a map with duplicated keys via built-in functions like CreateMap, StringToMap, etc. The behavior of map with duplicated keys is undefined, for example, map look up respects the duplicated key appears first, Dataset.collect only keeps the duplicated key appears last, MapKeys returns duplicated keys, etc. In Spark 3.0, Spark throws RuntimeException when duplicated keys are found. You can set spark.sql.mapKeyDedupPolicy to LAST_WIN to deduplicate map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (for example, Parquet), the behavior is undefined.

  val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy")
    .doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " +
      "MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " +
      "fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " +
      "at last takes precedence.")
    .version("3.0.0")
    .stringConf
    .transform(_.toUpperCase(Locale.ROOT))
    .checkValues(MapKeyDedupPolicy.values.map(_.toString))
    .createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
spark.dynamicAllocation.executorAllocationRatio
  private[spark] val DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO =
    ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio")
      .version("2.4.0")
      .doubleConf
      .createWithDefault(1.0)
  • 1
  • 2
  • 3
  • 4
  • 5
  /**
   * The maximum number of executors, for the ResourceProfile id passed in, that we would need
   * under the current load to satisfy all running and pending tasks, rounded up.
   */
  private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
    val pendingTask = listener.pendingTasksPerResourceProfile(rpId)
    val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
    val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
    val running = listener.totalRunningTasksPerResourceProfile(rpId)
    val numRunningOrPendingTasks = pendingTask + pendingSpeculative + running
    val rp = resourceProfileManager.resourceProfileFromId(rpId)
    val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
    logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
      s" tasksperexecutor: $tasksPerExecutor")
    val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
      tasksPerExecutor).toInt

    val maxNeededWithSpeculationLocalityOffset =
      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
      // If we have pending speculative tasks and only need a single executor, allocate one more
      // to satisfy the locality requirements of speculation
      maxNeeded + 1
    } else {
      maxNeeded
    }

    if (unschedulableTaskSets > 0) {
      // Request additional executors to account for task sets having tasks that are unschedulable
      // due to executors excluded for failures when the active executor count has already reached
      // the max needed which we would normally get.
      val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
        tasksPerExecutor).toInt
      math.max(maxNeededWithSpeculationLocalityOffset,
        executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables)
    } else {
      maxNeededWithSpeculationLocalityOffset
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

关键在这里

val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutor).toInt

来生成 executors 数量

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/790784
推荐阅读
相关标签
  

闽ICP备14008679号