赞
踩
Hive中默认使用单字节分隔符来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为\001。根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited fields terminated by ‘单字节分隔符’ 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。
在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如我们会遇到以下的两种情况
➢ 上图中每列的分隔符为||,为多字节分隔符
➢ 情况二:数据的字段中包含了分隔符
➢ 上图中每列的分隔符为空格,但是数据中包含了分割符,时间字段中也有空格
192.168.88.134 [08/Nov/2020:10:44:32 +0800] “GET / HTTP/1.1” 404 951
2.2 问题与需求
2.2.1 问题
基于上述的两种特殊数据,我们如果使用正常的加载数据的方式将数据加载到表中,就会出以下两种错误:
➢ 情况一:加载数据的分隔符为多字节分隔符
➢ 创建表
–如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定列的分隔符为||
row format delimited fields terminated by '||';
➢ 加载数据
load data local inpath ‘/export/data/test01.txt’ into table singer;
➢ 查看结果
select * from singer;
➢ 问题
数据发生了错位,没有正确的加载每一列的数据
➢ 原因
Hive中默认只支持单字节分隔符,无法识别多字节分隔符
➢ 情况二:数据中包含了分隔符
➢ 创建表
--如果表存在,就删除表
drop table if exists apachelog;
--创建表
create table apachelog(
ip string, --IP地址
stime string, --时间
mothed string, --请求方式
url string, --请求地址
policy string, --请求协议
stat string, --请求状态
body string --字节大小
)
--指定列的分隔符为空格
row format delimited fields terminated by ' ';
➢ 加载数据
load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog;
➢ 查看结果
select * from apachelog;
➢ 问题
时间字段被切分成了两个字段,后面所有的字段出现了错位
➢ 原因
时间数据中包含了分隔符,导致Hive认为这是两个字段,但实际业务需求中,为一个字段
基于上面两种情况的测试发现,当数据中出现了多字节分隔符或者数据中的某个字段包含了分隔符,就会导致数据加载错位的问题。基于出现的问题,我们需要通过特殊的方法来解决该问题,即使当数据中出现多字节分隔符等情况时,Hive也能正确的加载数据,实现列与数据的一一对应。
面对情况一,如果数据中的分隔符是多字节分隔符,可以使用程序提前将数据中的多字节分隔符替换为单字节分隔符,然后使用Hive加载,就可以实现正确加载对应的数据。
例如:原始数据中的分隔符为“||”
可以在ETL阶段通过一个MapReduce程序,将“||”替换为单字节的分隔符“|”,示例程序如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @ClassName ChangeSplitCharMR
* @Description TODO MapReduce实现将多字节分隔符转换为单字节符
* @Create By itcast
*/
public class ChangeSplitCharMR extends Configured implements Tool {
public int run(String[] arg) throws Exception {
/**
* 构建Job
*/
Job job = Job.getInstance(this.getConf(),"changeSplit");
job.setJarByClass(ChangeSplitCharMR.class);
/**
* 配置Job
*/
//input:读取需要转换的文件
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path("datas/split/test01.txt");
FileInputFormat.setInputPaths(job,inputPath);
//map:调用Mapper
job.setMapperClass(ChangeSplitMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//reduce:不需要Reduce过程
job.setNumReduceTasks(0);
//output
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path("datas/output/changeSplit");
TextOutputFormat.setOutputPath(job,outputPath);
/**
* 提交Job
*/
return job.waitForCompletion(true) ? 0 : -1;
}
//程序入口
public static void main(String[] args) throws Exception {
//调用run
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args);
System.exit(status);
}
public static class ChangeSplitMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
//定义输出的Key
private Text outputKey = new Text();
//定义输出的Value
private NullWritable outputValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取每条数据
String line = value.toString();
//将里面的||转换为|
String newLine = line.replaceAll("\\|\\|", "|");
//替换后的内容作为Key
this.outputKey.set(newLine);
//输出结果
context.write(this.outputKey,this.outputValue);
}
}
}
➢ 程序执行结果如下:
➢ 重新创建Hive表
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定列的分隔符为||
row format delimited fields terminated by '|';
➢ 在Hive中重新加载数据
load data local inpath ‘/export/data/part-m-00000’ into table singer;
➢ 查看结果
在ETL阶段可以直接对数据进行分隔符的替换,通过替换分隔符将多字节分隔符更改为单字节分隔符,就可以解决数据加载的问题,但是这种方式有对应的优缺点,并不是所有的场景适用于该方法。
优点:实现方式较为简单,基于字符串替换即可
缺点:无法满足情况2的需求
面对情况一和情况二的问题,Hive中提供了一种特殊的方式来解决,Hive提供了一种特殊的Serde来加载特殊数据的问题,使用正则匹配来加载数据,匹配每一列的数据。
官网地址:https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-ApacheWeblogData
Hive的SerDe提供了序列化和反序列化两个功能,SerDe是英文Serialize和Deserilize的组合缩写,用于实现将Hive中的对象进行序列化和将数据进行反序列化。
Serialize就是序列化,用于将Hive中使用的java object转换成能写入hdfs的字节序列,或者其他系统能识别的流文件。Hive中的insert语句用于将数据写入HDFS,所以就会调用序列化实现。Hive中的调用过程如下:
Deserilize就是反序列化,用于将字符串或者二进制数据流转换成Hive能识别的java object对
象。所有Hive中的Select语句在查询数据时,需要将HDFS中的数据解析为Hive中对象,就需要进行
反序列化。Hive可以方便的将数据加载到表中而不需要对数据进行转换,这样在处理海量数据时可
以节省大量的时间。Hive中的调用过程如下:
官网地址:https://cwiki.apache.org/confluence/display/Hive/SerDe
Hive中默认提供了多种SerDe用于解析和加载不同类型的数据文件,常用的有ORCSerde 、RegexSerde、JsonSerDe等。
RegexSerde是Hive中专门为了满足复杂数据场景所提供的正则加载和解析数据的接口,使用RegexSerde可以指定正则表达式加载数据,根据正则表达式匹配每一列数据。上述过程中遇到的情况一和情况二的问题,都可以通过RegexSerDe使用正则表达式来加载实现。
➢ 分析数据格式,构建正则表达式
➢ 原始数据格式
01||周杰伦||中国||台湾||男||七里香
➢ 正则表达式定义每一列
([0-9])\|\|(.)\|\|(.)\|\|(.)\|\|(.)\|\|(.)
➢ 正则校验
➢ 基于正则表达式,使用RegexSerde建表
–如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定使用RegexSerde加载数据
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
--指定正则表达式
WITH SERDEPROPERTIES (
"input.regex" = "([0-9]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)"
);
➢ 加载数据
load data local inpath ‘/export/data/test01.txt’ into table singer;
➢ 查看数据结果
select * from singer;
每一列的数据都被正常的加载,没有错位
➢ 分析数据格式,构建正则表达式
➢ 原始数据格式
192.168.88.100 [08/Nov/2020:10:44:33 +0800] “GET /hpsk_sdk/index.html HTTP/1.1” 200 328
➢ 正则表达式定义每一列
([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)
➢ 正则校验
➢ 基于正则表达式,使用RegexSerde建表
–如果表存在,就删除表
drop table if exists apachelog;
–创建表
create table apachelog(
ip string, --IP地址
stime string, --时间
mothed string, --请求方式
url string, --请求地址
policy string, --请求协议
stat string, --请求状态
body string --字节大小
)
–指定使用RegexSerde加载数据
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.RegexSerDe’
–指定正则表达式
WITH SERDEPROPERTIES (
“input.regex” = “([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)”
);
➢ 加载数据
load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog;
➢ 查看数据结果
select ip,stime,url,stat,body from apachelog;
RegexSerde使用简单,对于各种复杂的数据场景,都可以通过正则定义匹配每行中的每个字段,基本上可以满足大多数场景的需求,工作中推荐使用该方式来实现对于复杂数据的加载。
Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。
➢ 自定义InputFormat继承自TextInputFormat,读取数据时将每条数据中的”||”全部替换成“|”
➢ 自定义InputFormat
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
* @ClassName UserInputFormat
* @Description TODO 用于实现自定义InputFormat,读取每行数据
* @Create By Itcast
*/
public class UserInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job,
Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit);
return reader;
}
}
➢ 自定义RecordReader
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.io.InputStream;
/**
* @ClassName UserRecordReader
* @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
* @Create By Itcast
*/
public class UserRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());
int maxLineLength;
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
public UserRecordReader(Configuration job, FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
.createInputStream(fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Read a line.
*/
public synchronized boolean next(LongWritable key, Text value) throws IOException {
while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
String str = value.toString().replaceAll("\\|\\|", "\\|");
value.set(str);
pos += newSize;
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
public static class LineReader extends org.apache.hadoop.util.LineReader {
LineReader(InputStream in) {
super(in);
}
LineReader(InputStream in, int bufferSize) {
super(in, bufferSize);
}
public LineReader(InputStream in, Configuration conf) throws IOException {
super(in, conf);
}
}
}
➢ 将开发好的InputFormat打成jar包,放入Hive的lib目录中
➢ 在Hive中,将jar包添加到环境变量中
add jar /export/server/hive-3.1.2-bin/lib/HiveUserInputFormat.jar;
该方法可以实现临时添加,如果希望永久生效,重启Hive即可
➢ 创建表,指定自定义的InputFormat读取数据
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定使用分隔符为|
row format delimited fields terminated by '|'
stored as
--指定使用自定义的类实现解析
inputformat 'bigdata.itcast.cn.hive.mr.UserInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
➢ 加载数据
load data local inpath ‘/export/data/test01.txt’ into table singer;
select * from singer;
数据正常匹配,没有出现错位。
当数据文件中出现多字节分隔符或者数据中包含了分隔符时,会导致数据加载与实际表的字段不匹配的问题,基于这个问题我们提供了三种方案:替换分隔符、正则加载及自定义InputFormat来实现,其中替换分隔符无法解决数据中存在分隔符的问题,自定义InputFormat的开发成本较高,所以整体推荐使用正则加载的方式来实现对于特殊数据的处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。