当前位置:   article > 正文

hive指定多个字符作为列分隔符的问题说明_hive on spark外表分隔符允许多个字符

hive on spark外表分隔符允许多个字符

1、问题:HDFS文件上列分隔符是##,hive建表时直接用##,发现输出的字段和文件不一致。

      建表语句如下:

      

ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '##' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://nameservice-ha/pgw/gz'

2、原因:hive创建表指定分隔符时,不支持多个字符作为分隔符。

     上述就只能用#,简单解决办法就是写个MR程序将两个##改成一个#。


3、解决:Hive要支持多个字符作为分割符,需要自定义InputFormat.,重写next方法。

     代码如下:

    

  1. package com.hive;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapred.FileSplit;
  6. import org.apache.hadoop.mapred.InputSplit;
  7. import org.apache.hadoop.mapred.JobConf;
  8. import org.apache.hadoop.mapred.JobConfigurable;
  9. import org.apache.hadoop.mapred.RecordReader;
  10. import org.apache.hadoop.mapred.Reporter;
  11. import org.apache.hadoop.mapred.TextInputFormat;
  12. public class DefTextInputFormat extends TextInputFormat implements JobConfigurable {
  13. public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
  14. reporter.setStatus(genericSplit.toString());
  15. return new DefRecordReader((FileSplit)genericSplit, job);
  16. }
  17. }

  1. package com.hive;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.io.compress.CompressionCodec;
  11. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  12. import org.apache.hadoop.mapred.FileSplit;
  13. import org.apache.hadoop.mapred.RecordReader;
  14. import org.apache.hadoop.util.LineReader;
  15. public class DefRecordReader implements RecordReader<LongWritable, Text> {
  16. private CompressionCodecFactory compressionCodecs = null;
  17. private long start;
  18. private long pos;
  19. private long end;
  20. private LineReader lineReader;
  21. int maxLineLength;
  22. // 构造方法
  23. public DefRecordReader(FileSplit inputSplit, Configuration job) throws IOException {
  24. maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
  25. Integer.MAX_VALUE);
  26. start = inputSplit.getStart();
  27. end = start + inputSplit.getLength();
  28. final Path file = inputSplit.getPath();
  29. // 创建压缩器
  30. compressionCodecs = new CompressionCodecFactory(job);
  31. final CompressionCodec codec = compressionCodecs.getCodec(file);
  32. // 打开文件系统
  33. FileSystem fs = file.getFileSystem(job);
  34. FSDataInputStream fileIn = fs.open(file);
  35. boolean skipFirstLine = false;
  36. if (codec != null) {
  37. lineReader = new LineReader(codec.createInputStream(fileIn), job);
  38. end = Long.MAX_VALUE;
  39. } else {
  40. if (start != 0) {
  41. skipFirstLine = true;
  42. --start;
  43. fileIn.seek(start);
  44. }
  45. lineReader = new LineReader(fileIn, job);
  46. }
  47. if (skipFirstLine) {
  48. start += lineReader.readLine(new Text(), 0,
  49. (int) Math.min((long) Integer.MAX_VALUE, end - start));
  50. }
  51. this.pos = start;
  52. }
  53. public DefRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
  54. this.maxLineLength = maxLineLength;
  55. this.start = offset;
  56. this.lineReader = new LineReader(in);
  57. this.pos = offset;
  58. this.end = endOffset;
  59. }
  60. public DefRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException {
  61. this.maxLineLength = job.getInt(
  62. "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
  63. this.lineReader = new LineReader(in, job);
  64. this.start = offset;
  65. this.end = endOffset;
  66. }
  67. @Override
  68. public void close() throws IOException {
  69. if (lineReader != null)
  70. lineReader.close();
  71. }
  72. @Override
  73. public LongWritable createKey() {
  74. return new LongWritable();
  75. }
  76. @Override
  77. public Text createValue() {
  78. return new Text();
  79. }
  80. @Override
  81. public long getPos() throws IOException {
  82. return pos;
  83. }
  84. @Override
  85. public float getProgress() throws IOException {
  86. if (start == end) {
  87. return 0.0f;
  88. } else {
  89. return Math.min(1.0f, (pos - start) / (float) (end - start));
  90. }
  91. }
  92. @Override
  93. //重构next方法,处理行中字符,将多个列分割字符变成1个列分割字符
  94. public boolean next(LongWritable key, Text value) throws IOException {
  95. while (pos < end) {
  96. key.set(pos);
  97. int newSize = lineReader.readLine(value, maxLineLength,
  98. Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
  99. maxLineLength));
  100. // 把字符串中的"##"转变为"#"
  101. String strReplace = value.toString().replace("##", "#");
  102. Text txtReplace = new Text();
  103. txtReplace.set(strReplace);
  104. value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
  105. if (newSize == 0)
  106. return false;
  107. pos += newSize;
  108. if (newSize < maxLineLength)
  109. return true;
  110. }
  111. return false;
  112. }
  113. }

在建表时,指定com.hive.DefTextInputFormat类为INPUTFORMAT 。

当然要先将这两个类打包成jar部署到Hive的运行环境中,可参考http://blog.csdn.net/fjssharpsword/article/details/70271671

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/692430
推荐阅读
相关标签
  

闽ICP备14008679号