赞
踩
(1)都回提交哪些信息?
(2) 切片信息怎么得到?
默认TextInputFormat调用父类FileInputPutFormat 中getSplits方法得到切片信息。
再调用createRecordReader 返回RecordReader对象读取切片记录。默认使用LineRecordreader 读取切片信息。行偏移量作为key,内容作为value。RecordReader会在输入块上被反复调用,直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper类的map()函数。
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job)
类之间的关系:
InputFormat只定义了规范。没有添加实现方法
public abstract class InputFormat<K, V> {
//输入数据切分成splits
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
//返回一个能够读取分片记录的RecordReader 默认是 LineRecordReader 每行的偏移量作为map的key,每行的内容作为map的value;
//SequenceFileInputFormat的RecordReader是SequenceFileRecordReader; public abstract RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException;
}
getSplits的源码:默认是TextInputForamt
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } } *********<
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。