当前位置:   article > 正文

sparkConf常见参数设置_spark.conf.set

spark.conf.set
def getSparkConf():SparkConf = {

    val sparkConf: SparkConf = new SparkConf()
      .set("spark.driver.cores","4")  //设置driver的CPU核数
      .set("spark.driver.maxResultSize","2g") //设置driver端结果存放的最大容量,这里设置成为2G,超过2G的数据,job就直接放弃,不运行了
      .set("spark.driver.memory","4g")  //driver给的内存大小
      .set("spark.executor.memory","8g")// 每个executor的内存
      .set("spark.submit.deployMode","cluster")  //spark 任务提交模式,线上使用cluster模式,开发使用client模式
      .set("spark.worker.timeout" ,"500") //基于standAlone模式下提交任务,worker的连接超时时间
      .set("spark.cores.max" , "10")  //基于standAlone和mesos模式下部署,最大的CPU和数量
      .set("spark.rpc.askTimeout" , "600s")  //spark任务通过rpc拉取数据的超时时间
      .set("spark.locality.wait" , "5s") //每个task获取本地数据的等待时间,默认3s钟,如果没获取到,依次获取本进程,本机,本机架数据
      .set("spark.task.maxFailures" , "5")  //允许最大失败任务数,根据自身容错情况来定
      .set("spark.serializer" ,"org.apache.spark.serializer.KryoSerializer")  //配置序列化方式
      .set("spark.streaming.kafka.maxRatePerPartition" , "5000")  //使用directStream方式消费kafka当中的数据,获取每个分区数据最大速率
      .set("spark.streaming.backpressure.enabled" , "true")  //开启sparkStreaming背压机制,接收数据的速度与消费数据的速度实现平衡
    //  .set("spark.streaming.backpressure.pid.minRate","10")
      .set("spark.driver.host", "localhost")  //配置driver地址
      //shuffle相关参数调优开始
      .set("spark.reducer.maxSizeInFlight","96m")  //reduceTask拉取map端输出的最大数据量,调整太大有OOM的风险
      .set("spark.shuffle.compress","true")  //开启shuffle数据压缩
      .set("spark.default.parallelism","10")  //设置任务的并行度
      .set("spark.files.fetchTimeout","120s")  //设置文件获取的超时时间
      //网络相关参数
      .set("spark.rpc.message.maxSize","256")  //RPC拉取数据的最大数据量,单位M
      .set("spark.network.timeout","120s")  //网络超时时间设置
      .set("spark.scheduler.mode","FAIR")  //spark 任务调度模式  使用 fair公平调度
      //spark任务资源动态划分  https://spark.apache.org/docs/2.3.0/job-scheduling.html#configuration-and-setup
      .set("spark.dynamicAllocation.enabled","true")
      .set("spark.shuffle.service.enabled","true")
      .set("spark.dynamicAllocation.executorIdleTimeout","120s")  //executor空闲时间超过这个值,该executor就会被回收
      .set("spark.dynamicAllocation.minExecutors","0")  //最少的executor个数
      .set("spark.dynamicAllocation.maxExecutors","32")  //最大的executor个数  根据自己实际情况调整
      .set("spark.dynamicAllocation.initialExecutors","4")//初始executor个数
      .set("spark.dynamicAllocation.schedulerBacklogTimeout","5s")  //pending 状态的task时间,过了这个时间继续pending ,申请新的executor
      .setMaster("local[1]")
      .setAppName("Stream")
    sparkConf.set("spark.speculation", "true")   //开启推测执行
    sparkConf.set("spark.speculation.interval", "100s")  // 每隔多久检测一次是否需要进行推测执行任务
    sparkConf.set("spark.speculation.quantile","0.9")  //完成任务的百分比,然后才能启动推测执行
    sparkConf.set("spark.streaming.backpressure.initialRate" , "500")  // //开启sparkStreaming的背压机制,然后第一批次获取数据的最大速率

    sparkConf.registerKryoClasses(
      Array(
        classOf[OrderInfo],
        classOf[Opt_alliance_business],
        classOf[DriverInfo],
        classOf[RegisterUsers]
      )
    )
    sparkConf
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/609806
推荐阅读
相关标签
  

闽ICP备14008679号