赞
踩
同反压机制一样,Spark Streaming动态资源分配(即DRA,Dynamic Resource Allocation)也可以用来应对流处理中批次流量过载的场景。
Spark Streaming动态资源分配,允许为应用动态分配资源。当任务积压时,申请更多资源;当任务空闲时,使用最少资源。
在生产中,可将动态资源分配和背压机制一起使用,通过背压机制来细粒度确保系统稳定;通过动态资源分配机制来粗粒度根据应用负载,动态增减Executors。共同保证Spark Streaming流处理应用的稳定高效。
入口类是org.apache.spark.streaming.scheduler.ExecutorAllocationManager
。ExecutorAllocationManager
中的定时器,每隔spark.streaming.dynamicAllocation.scalingInterval
时间,调用一次manageAllocation
方法来管理Executor
。manageAllocation
方法计算规则如下:
必须完成至少一个Batch处理,即batchProcTimeCount > 0
。
计算Batch平均处理时间(Batch平均处理时间=Batch总处理时间/Batch总处理次数
)。
若Batch平均处理时间
大于阈值spark.streaming.dynamicAllocation.scalingUpRatio
,则请求新的Executor。
若Batch平均处理时间
小于阈值spark.streaming.dynamicAllocation.scalingDownRatio
,则移除没有任务的Executor。
spark.dynamicAllocation.enabled
: 默认false,是否启用Spark批处理动态资源分配。
spark.streaming.dynamicAllocation.enabled
: 默认false,是否启用Spark Streaming流处理动态资源分配。
spark.streaming.dynamicAllocation.scalingInterval
: 默认60秒,多久检查一次。
spark.streaming.dynamicAllocation.scalingUpRatio
: 默认0.9,增加Executor的阈值。
spark.streaming.dynamicAllocation.scalingDownRatio
: 默认0.3,减少Executor的阈值。
spark.streaming.dynamicAllocation.minExecutors
: 默认无,最小Executor个数
spark.streaming.dynamicAllocation.maxExecutors
: 默认无,最大Executor个数。
Spark Core动态资源分配适合于批处理,如Spark Sql Cli
,可以根据Task数量动态分配Executor数量;如Spark ThriftServer On Yarn
,空闲时不占用资源,只有在用户提交Sql任务时才会根据Task数动态分配Executor数。
当开启Spark Streaming动态资源分配时,需要关闭Spark Core动态资源分配。
由于Spark Streaming动态资源分配需要根据Batch总处理时间和Batch总处理次数来计算Batch平均处理时间,因此需要至少完成一个Batch处理。这就需要我们保证在Spark Streaming动态资源分配起作用前,应用程序不会崩溃。
sparkCommLib=/data/apps/sparkCommLib /usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --queue default \ --name spark_streaming_dra \ --driver-cores 1 \ --driver-memory 1G \ --executor-memory 1G \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.streaming.dynamicAllocation.enabled=true \ --conf spark.streaming.dynamicAllocation.minExecutors=1 \ --conf spark.streaming.dynamicAllocation.maxExecutors=15 \ --jars ${sparkCommLib}/kafka_2.11-0.10.1.0.jar,${sparkCommLib}/kafka-clients-0.10.1.0.jar,${sparkCommLib}/spark-streaming-kafka-0-10_2.11-2.1.1.jar,${sparkCommLib}/fastjson-1.2.5.jar \ --class com.bigData.spark.SparkStreamingDRA \ spark-1.0-SNAPSHOT.jar
在Yarn上可以看到,随着Spark Streaming任务队列中Queued的Batch越来越多,Executors数量在逐渐增加。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。