当前位置:   article > 正文

Spark Streaming动态资源分配_sparkstreaming的资源

sparkstreaming的资源

同反压机制一样,Spark Streaming动态资源分配(即DRA,Dynamic Resource Allocation)也可以用来应对流处理中批次流量过载的场景。

Spark Streaming动态资源分配,允许为应用动态分配资源。当任务积压时,申请更多资源;当任务空闲时,使用最少资源。

在生产中,可将动态资源分配和背压机制一起使用,通过背压机制来细粒度确保系统稳定;通过动态资源分配机制来粗粒度根据应用负载,动态增减Executors。共同保证Spark Streaming流处理应用的稳定高效。

动态资源分配的原理

入口类是org.apache.spark.streaming.scheduler.ExecutorAllocationManagerExecutorAllocationManager中的定时器,每隔spark.streaming.dynamicAllocation.scalingInterval时间,调用一次manageAllocation方法来管理ExecutormanageAllocation方法计算规则如下:

  1. 必须完成至少一个Batch处理,即batchProcTimeCount > 0

  2. 计算Batch平均处理时间(Batch平均处理时间=Batch总处理时间/Batch总处理次数)。

  3. Batch平均处理时间大于阈值spark.streaming.dynamicAllocation.scalingUpRatio,则请求新的Executor。

  4. Batch平均处理时间小于阈值spark.streaming.dynamicAllocation.scalingDownRatio,则移除没有任务的Executor。

动态资源分配重要参数

  1. spark.dynamicAllocation.enabled: 默认false,是否启用Spark批处理动态资源分配。

  2. spark.streaming.dynamicAllocation.enabled: 默认false,是否启用Spark Streaming流处理动态资源分配。

  3. spark.streaming.dynamicAllocation.scalingInterval: 默认60秒,多久检查一次。

  4. spark.streaming.dynamicAllocation.scalingUpRatio: 默认0.9,增加Executor的阈值。

  5. spark.streaming.dynamicAllocation.scalingDownRatio: 默认0.3,减少Executor的阈值。

  6. spark.streaming.dynamicAllocation.minExecutors: 默认无,最小Executor个数

  7. spark.streaming.dynamicAllocation.maxExecutors: 默认无,最大Executor个数。

Spark Streaming动态资源分配注意事项

  1. Spark Streaming动态资源分配和Spark Core动态资源分配互斥

Spark Core动态资源分配适合于批处理,如Spark Sql Cli,可以根据Task数量动态分配Executor数量;如Spark ThriftServer On Yarn,空闲时不占用资源,只有在用户提交Sql任务时才会根据Task数动态分配Executor数。

当开启Spark Streaming动态资源分配时,需要关闭Spark Core动态资源分配。

  1. Spark Streaming动态资源分配起作用前,需要至少完成一个Batch处理

由于Spark Streaming动态资源分配需要根据Batch总处理时间和Batch总处理次数来计算Batch平均处理时间,因此需要至少完成一个Batch处理。这就需要我们保证在Spark Streaming动态资源分配起作用前,应用程序不会崩溃。

  1. Spark Streaming动态资源分配应当和Spark Streaming背压机制同时使用

启用动态资源分配


sparkCommLib=/data/apps/sparkCommLib

/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue default \
--name spark_streaming_dra \
--driver-cores 1 \
--driver-memory 1G \
--executor-memory 1G \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.streaming.dynamicAllocation.minExecutors=1 \
--conf spark.streaming.dynamicAllocation.maxExecutors=15 \
--jars ${sparkCommLib}/kafka_2.11-0.10.1.0.jar,${sparkCommLib}/kafka-clients-0.10.1.0.jar,${sparkCommLib}/spark-streaming-kafka-0-10_2.11-2.1.1.jar,${sparkCommLib}/fastjson-1.2.5.jar \
--class com.bigData.spark.SparkStreamingDRA \
spark-1.0-SNAPSHOT.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在Yarn上可以看到,随着Spark Streaming任务队列中Queued的Batch越来越多,Executors数量在逐渐增加。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/653318
推荐阅读
相关标签
  

闽ICP备14008679号