当前位置:   article > 正文

Spark读hive text表之非shuffle方式增大并行度

hive on spark处理text文件
背景介绍:
  • cdh集群、hadoop2.6.0、spark2.3.0
  • hive表:text格式存储
  • 数据块:128M
  • 处理过程:读取hive表 -> 业务处理(无聚合操作) -> 写入hive、es
问题描述:

正常情况下,一个spark task要处理一个partition即128M的数据,因处理过程较耗时而成为任务瓶颈。

解决过程:

大的方向是进行任务拆分,增大并行度。

  • 方法一:使用spark提供的repartition/coalesce

优点:RDD中定义的算子,可以直接使用
缺点:使用以上算子来增大并行度,一定会进行shuffle操作
结论:测试发现,虽然增大了业务处理的并行度,但shuffle操作的开销比较大,因此整体的耗时没有明显减少。

  • 方法二:基于spark读text格式文件的分片算法,从源头减小数据块以增大并行度

初始化SparkSession时进行如下代码设置:

  1. .config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即为想设置的分片大小:64M
  2. .config("mapreduce.job.maps","1000") // 确保分片足够大
  3. 复制代码

用以实现spark读取hive时,一个task处理一个64M的数据块。
优点:理论来说,并行度扩大一倍,耗时将减少一半。
结论:测试发下,耗时确实大幅度下降。

源码分析

调用链: HadoopTableReader#createHadoopRdd

HadoopRDD#getPartitions
  FileInputFormat#getSplits
    FileInputFormat#computeSplitSize

核心代码片段

  1. private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
  2. 0 // will splitted based on block by default.
  3. } else {
  4. math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
  5. sparkSession.sparkContext.defaultMinPartitions)
  6. }
  7. 复制代码
  1. val rdd = new HadoopRDD(
  2. sparkSession.sparkContext,
  3. _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
  4. Some(initializeJobConfFunc),
  5. inputFormatClass,
  6. classOf[Writable],
  7. classOf[Writable],
  8. _minSplitsPerRDD)
  9. 复制代码

由HadoopTableReader生成HadoopRDD,参数:_minSplitsPerRDD在非local模式下可通过mapreduce.job.maps设置

  1. public InputSplit[] getSplits(JobConf job, int numSplits)
  2. throws IOException {
  3. // 多处省略
  4. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  5. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  6. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  7. long blockSize = file.getBlockSize();
  8. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  9. return splits.toArray(new FileSplit[splits.size()]);
  10. }
  11. 复制代码
  1. protected long computeSplitSize(long goalSize, long minSize,
  2. long blockSize) {
  3. return Math.max(minSize, Math.min(goalSize, blockSize));
  4. }
  5. 复制代码
  1. public static final String SPLIT_MINSIZE =
  2. "mapreduce.input.fileinputformat.split.minsize";
  3. 复制代码

最终数据分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))计算得到,根据源码可知:

  1. blockSize:hdfs实际存储的blockSize,128M不可变
  2. goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
  3. numSplits local模式下为0;其他模式可通过:mapreduce.job.maps 配置
  4. minSize:SPLIT_MINSIZE与minSplitSize的最大值
  5. SPLIT_MINSIZE:默认为1,可通过mapreduce.input.fileinputformat.split.minsize 配置
  6. minSplitSize:默认为1
  7. 复制代码

默认情况下, 返回结果为128M。为了让计算结果为减小,比如64M,只需要 minSize为64M,Math.min(goalSize, blockSize)足够小即可,即:

  • 设置 numSplits 足够大比如1000(参数:mapreduce.job.maps),就能保证goalSize足够小,进而保证Math.min(goalSize, blockSize)足够小
  • 设置 SPLIT_MINSIZE 为64M(参数:mapreduce.input.fileinputformat.split.minsize),根据 Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize),即可实现 minSize 为64M
结论总结

结合spark分片机制进行参数设置,既提高任务并行度又避免shuffle的性能损耗。

转载于:https://juejin.im/post/5cea9b7f51882505107f62ff

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/529237
推荐阅读
相关标签
  

闽ICP备14008679号