背景介绍:
- cdh集群、hadoop2.6.0、spark2.3.0
- hive表:text格式存储
- 数据块:128M
- 处理过程:读取hive表 -> 业务处理(无聚合操作) -> 写入hive、es
问题描述:
正常情况下,一个spark task要处理一个partition即128M的数据,因处理过程较耗时而成为任务瓶颈。
解决过程:
大的方向是进行任务拆分,增大并行度。
- 方法一:使用spark提供的repartition/coalesce
优点:RDD中定义的算子,可以直接使用
缺点:使用以上算子来增大并行度,一定会进行shuffle操作
结论:测试发现,虽然增大了业务处理的并行度,但shuffle操作的开销比较大,因此整体的耗时没有明显减少。
- 方法二:基于spark读text格式文件的分片算法,从源头减小数据块以增大并行度
初始化SparkSession时进行如下代码设置:
- .config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即为想设置的分片大小:64M
- .config("mapreduce.job.maps","1000") // 确保分片足够大
-
- 复制代码
用以实现spark读取hive时,一个task处理一个64M的数据块。
优点:理论来说,并行度扩大一倍,耗时将减少一半。
结论:测试发下,耗时确实大幅度下降。
源码分析
调用链: HadoopTableReader#createHadoopRdd
HadoopRDD#getPartitions
FileInputFormat#getSplits
FileInputFormat#computeSplitSize
核心代码片段
- private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
- 0 // will splitted based on block by default.
- } else {
- math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
- sparkSession.sparkContext.defaultMinPartitions)
- }
-
- 复制代码
- val rdd = new HadoopRDD(
- sparkSession.sparkContext,
- _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
- Some(initializeJobConfFunc),
- inputFormatClass,
- classOf[Writable],
- classOf[Writable],
- _minSplitsPerRDD)
-
- 复制代码
由HadoopTableReader生成HadoopRDD,参数:_minSplitsPerRDD在非local模式下可通过mapreduce.job.maps设置
-
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- // 多处省略
-
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
- long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
- FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
-
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
-
- return splits.toArray(new FileSplit[splits.size()]);
- }
- 复制代码
-
- protected long computeSplitSize(long goalSize, long minSize,
- long blockSize) {
- return Math.max(minSize, Math.min(goalSize, blockSize));
- }
-
- 复制代码
- public static final String SPLIT_MINSIZE =
- "mapreduce.input.fileinputformat.split.minsize";
-
- 复制代码
最终数据分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))
计算得到,根据源码可知:
- blockSize:hdfs实际存储的blockSize,128M不可变
- goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
- numSplits local模式下为0;其他模式可通过:mapreduce.job.maps 配置
-
- minSize:SPLIT_MINSIZE与minSplitSize的最大值
- SPLIT_MINSIZE:默认为1,可通过mapreduce.input.fileinputformat.split.minsize 配置
- minSplitSize:默认为1
- 复制代码
默认情况下, 返回结果为128M。为了让计算结果为减小,比如64M,只需要 minSize
为64M,Math.min(goalSize, blockSize)
足够小即可,即:
- 设置 numSplits 足够大比如1000(参数:mapreduce.job.maps),就能保证goalSize足够小,进而保证Math.min(goalSize, blockSize)足够小
- 设置 SPLIT_MINSIZE 为64M(参数:mapreduce.input.fileinputformat.split.minsize),根据
Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize)
,即可实现 minSize 为64M
结论总结
结合spark分片机制进行参数设置,既提高任务并行度又避免shuffle的性能损耗。