当前位置:   article > 正文

hive小文件造成map多_map数量太多

map数量太多

问题现象:hive查询时生成了大量的map,损耗了过多的cpu资源,参数调配没有生效

问题分析:

hive的map数 是由设定的inputsplit size来决定,hive封装了hadoop给出了inputformat的接口,用于描述输入数据的格式,并交由hive.input.format参数所决定,其中包含了两种主要使用类型:

1:HiveInputFormat 

2:CombineHiveInputFormat

对于combineHiveInputFormat的计算来说,经过的流程如下所述:(重点为标红字段)

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

    //加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat

    CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()

        .getCombineFileInputFormat();

if (combine == null) {

//若为空则采用HiveInputFormat的方式,下同

      return super.getSplits(job, numSplits);

    }

    Path[] paths = combine.getInputPathsShim(job);

for (Path path : paths) {

//若是外部表,则按照HiveInputFormat方式分片

      if ((tableDesc != null) && tableDesc.isNonNative()) {

        return super.getSplits(job, numSplits);

      }

      Class inputFormatClass = part.getInputFileFormatClass();

      String inputFormatClassName = inputFormatClass.getName();

      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);

      if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {

        if (inputFormat instanceof TextInputFormat) {

         if ((new CompressionCodecFactory(job)).getCodec(path) != null)

//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法

                    return super.getSplits(job, numSplits);

        }

      }

    //对于连接式同上

      if (inputFormat instanceof SymlinkTextInputFormat) {

        return super.getSplits(job, numSplits);

      }

      CombineFilter f = null;

      boolean done = false;

Path filterPath = path;

//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;

      if (!mrwork.isMapperCannotSpanPartns()) {

        opList = HiveFileFormatUtils.doGetWorksFromPath(

                   pathToAliases, aliasToWork, filterPath);

        f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName));

      }

      if (!done) {

        if (f == null) {

          f = new CombineFilter(filterPath);

          combine.createPool(job, f);

        } else {

          f.addPath(filterPath);

        }

      }

    }

if (!mrwork.isMapperCannotSpanPartns()) {

//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat

      iss = Arrays.asList(combine.getSplits(job, 1));

}

//对于sample查询特殊处理

    if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) {

      iss = sampleSplits(iss);

}

//封装结果返回

    for (InputSplitShim is : iss) {

      CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is);

      result.add(csplit);

    }

    return result.toArray(new CombineHiveInputSplit[result.size()]);

  }

 

 

 

为TextInputFormat,并非实际存储的压缩格式,因为缺少相关参数设定造成了配置失效,开始走hiveinputformat计算 。

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

    //扫描每一个分区

    for (Path dir : dirs) {

      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);

    //获取分区的输入格式

      Class inputFormatClass = part.getInputFileFormatClass();

      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);

    //按照相应格式的分片算法获取分片

    //注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;

      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);

      for (InputSplit is : iss) {

    //封装结果,返回

        result.add(new HiveInputSplit(is, inputFormatClass.getName()));

      }

    }

    return result.toArray(new HiveInputSplit[result.size()]);

}

解决方案:

设定hive.hadoop.supports.splittable.combineinputformat参数

相关全部参数:

set hive.merge.mapfiles=true;

set hive.hadoop.supports.splittable.combineinputformat=true;

set hive.merge.size.per.task=2147483648;

set hive.merge.smallfiles.avgsize=2147483648;

set hive.merge.size.smallfiles.avgsize=2147483648;

set mapreduce.input.fileinputformat.split.maxsize=2147483648;

set mapred.max.split.size=2147483648;

set mapred.min.split.size.per.node=2147483648;

set mapred.min.split.size.per.rack=2147483648;

set hive.exec.reducers.bytes.per.reducer=2147483648;

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

验证结果:

map数由11236降低至2363个map,证明参数修改生效。

 

 

 

 

 注:设定过程中需要注意调配输入数据量大小,防止单个map输入数据量过多造成运行缓慢。相关数值需合理调配

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/727998
推荐阅读
相关标签
  

闽ICP备14008679号