当前位置:   article > 正文

hadoop,hive中的mv(rename)操作_hadoop rename

hadoop rename

            系统环境:hadoop2.7.2+hive1.2.1

       大约一年多之前,将hive版本从0.14升级到了1.2.1。之后发现新版本在最后一步写入数据的时候,会比以前慢很多。最后发现是由于hive新版本中,默认中间结果文件是在表空间下生成以  .hive-staging_hive_  开头的文件。以前版本默认是在/tmp/hive下。最后查出来解决办法:

       修改配置文件参数:

        <property>
             <name>hive.exec.stagingdir</name>
             <value>/tmp/hive/.hive-staging</value>
        </property>

        产生问题的原因是,在hive1.2.1版本中,如果中间结果文件目录跟目标目录在同一根目录的话,就会将中间结果数据复制到目标目录。而以前是直接将原目录(/tmp/hive)下的目录直接rename到目标目录。因此,变慢的原因是多了一个额外的数据复制工作。具体看代码:

       

  1. MoveTask.java
  2. private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir)
  3. throws Exception {
  4. FileSystem fs = sourcePath.getFileSystem(conf);
  5. if (isDfsDir) {
  6. // Just do a rename on the URIs, they belong to the same FS
  7. String mesg = "Moving data to: " + targetPath.toString();
  8. String mesg_detail = " from " + sourcePath.toString();
  9. console.printInfo(mesg, mesg_detail);
  10. // if source exists, rename. Otherwise, create a empty directory
  11. if (fs.exists(sourcePath)) {
  12. Path deletePath = null;
  13. // If it multiple level of folder are there fs.rename is failing so first
  14. // create the targetpath.getParent() if it not exist
  15. if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
  16. deletePath = createTargetPath(targetPath, fs);
  17. }
  18. if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) {
  19. try {
  20. if (deletePath != null) {
  21. fs.delete(deletePath, true);
  22. }
  23. } catch (IOException e) {
  24. LOG.info("Unable to delete the path created for facilitating rename"
  25. + deletePath);
  26. }
  27. throw new HiveException("Unable to rename: " + sourcePath
  28. + " to: " + targetPath);
  29. }
  30. } else if (!fs.mkdirs(targetPath)) {
  31. throw new HiveException("Unable to make directory: " + targetPath);
  32. }
  33. } else {
  34. // This is a local file
  35. String mesg = "Copying data to local directory " + targetPath.toString();
  36. String mesg_detail = " from " + sourcePath.toString();
  37. console.printInfo(mesg, mesg_detail);
  38. // delete the existing dest directory
  39. LocalFileSystem dstFs = FileSystem.getLocal(conf);
  40. if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
  41. console.printInfo(mesg, mesg_detail);
  42. // if source exists, rename. Otherwise, create a empty directory
  43. if (fs.exists(sourcePath)) {
  44. fs.copyToLocalFile(sourcePath, targetPath);
  45. } else {
  46. if (!dstFs.mkdirs(targetPath)) {
  47. throw new HiveException("Unable to make local directory: "
  48. + targetPath);
  49. }
  50. }
  51. } else {
  52. throw new AccessControlException(
  53. "Unable to delete the existing destination directory: "
  54. + targetPath);
  55. }
  56. }
  57. }

   从moveTask中跟踪到,实际上是Hive.java中的moveFile方法

   

  1. public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
  2. FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException {
  3. boolean success = false;
  4. //needed for perm inheritance.
  5. boolean inheritPerms = HiveConf.getBoolVar(conf,
  6. HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
  7. HadoopShims shims = ShimLoader.getHadoopShims();
  8. HadoopShims.HdfsFileStatus destStatus = null;
  9. HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
  10. // If source path is a subdirectory of the destination path:
  11. // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
  12. // where the staging directory is a subdirectory of the destination directory
  13. // (1) Do not delete the dest dir before doing the move operation.
  14. // (2) It is assumed that subdir and dir are in same encryption zone.
  15. // (3) Move individual files from scr dir to dest dir.
  16. boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal);
  17. try {
  18. if (inheritPerms || replace) {
  19. try{
  20. destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
  21. //if destf is an existing directory:
  22. //if replace is true, delete followed by rename(mv) is equivalent to replace
  23. //if replace is false, rename (mv) actually move the src under dest dir
  24. //if destf is an existing file, rename is actually a replace, and do not need
  25. // to delete the file first
  26. if (replace && !destIsSubDir) {
  27. LOG.debug("The path " + destf.toString() + " is deleted");
  28. fs.delete(destf, true);
  29. }
  30. } catch (FileNotFoundException ignore) {
  31. //if dest dir does not exist, any re
  32. if (inheritPerms) {
  33. destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
  34. }
  35. }
  36. }
  37. if (!isSrcLocal) {
  38. // For NOT local src file, rename the file
  39. if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
  40. && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf))
  41. {
  42. LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
  43. success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
  44. true, // delete source
  45. replace, // overwrite destination
  46. conf);
  47. } else {
  48. if (destIsSubDir) {
  49. FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
  50. if (srcs.length == 0) {
  51. success = true; // Nothing to move.
  52. }
  53. for (FileStatus status : srcs) {
  54. success = FileUtils.copy(srcf.getFileSystem(conf), status.getPath(), destf.getFileSystem(conf), destf,
  55. true, // delete source
  56. replace, // overwrite destination
  57. conf);
  58. if (!success) {
  59. throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
  60. }
  61. }
  62. } else {
  63. success = fs.rename(srcf, destf);
  64. }
  65. }
  66. } else {
  67. // For local src file, copy to hdfs
  68. fs.copyFromLocalFile(srcf, destf);
  69. success = true;
  70. }
  71. LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString()
  72. + ", dest: " + destf.toString() + ", Status:" + success);
  73. } catch (IOException ioe) {
  74. throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
  75. }
  76. if (success && inheritPerms) {
  77. try {
  78. ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf);
  79. } catch (IOException e) {
  80. LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
  81. }
  82. }
  83. return success;
  84. }
          从代码中可以看出,有两种策略:如果源目录和目标目录是同一个根目录,则会源目录下的每个文件执行复制操作。反之,执行remane操作(只涉及元数据,不会有额外数据操作)。

         为什么会这样呢?我做了一个关于rename的验证。

        

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. public class TestHdfs {
  6. public static void main(String args[]) throws IOException{
  7. Configuration conf = new Configuration();
  8. FileSystem fs = FileSystem.get(conf);
  9. Path srct=new Path("/tmp/lgh/d1/.dd1");
  10. Path dst=new Path("/usr");
  11. fs.rename(srct, dst);
  12. }
  13. }
           验证说明,.dd1 目录下有数据文件,执行程序之后,数据文件都移动到了/usr/目录下。对于以 .  开头的目录下的文件,rename方法,如果源目录和目标目录没有相同的前缀的话,会将数据文件移动到目标目录中。如果不是的话,则会将目标目录也移动过去。

          而hive的中间结果文件命名规范,就是以.hive-staging开头的。因此,会出现上述情况。大家以可以用hadoop fs -mv命令验证。

   

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/881388
推荐阅读
相关标签
  

闽ICP备14008679号