赞
踩
Job(作业) : 一个MR程序称为一个Job。
MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。它负责Job中执行状态的监控,容错,和RM申请资源,提交Task等!
Task(任务): Task是一个进程!负责某项计算!
Map(Map阶段):
Map是MapReduce程序运行的第一个阶段!Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分!
切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算!Task负责Map阶段程序的计算,称为MapTask!
在一个MR程序的Map阶段,会启动N(取决于切片数)个MapTask。每个MapTask是并行运行!
Reduce(Reduce阶段):
Reduce是MapReduce程序运行的第二个阶段(最后一个阶段)!
Reduce阶段的目的是将Map阶段,每个MapTask计算后的结果进行合并汇总!得到最终结果!
Reduce阶段是可选的,即可以不要。
负责Reduce阶段程序的计算的Task称为ReduceTask。 一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行!且每个ReduceTask最终都会产生一个结果!
MR程序必须指定一个输入目录,一个输出目录!InputFormat代表输入目录中文件的格式!
如果是普通文件,可以使用FileInputFormat.
如果是SequeceFile(hadoop提供的一种文件格式),可以使用SequnceFileInputFormat.
如果处理的数据在数据库中,需要使用DBInputFormat
OutPutFormat代表MR处理后的结果,要以什么样的文件格式写出!
将结果写出到一个普通文件中,可以使用FileOutputFormat!
将结果写出到数据库中,可以使用DBOutPutFormat!
将结果写出到SequeceFile中,可以使用SequnceFileOutputFormat
RecordWriter将处理的结果以什么样的格式,写出到输出文件中!
在Mapper将数据写出时,为每组key-value打上标记,进行分区!目的: 一个ReduceTask只会处理一个分区的数据!
需求: 统计某目录中每个文件的单词数量,a-p开头的单词放入到一个结果文件中,q-z开头的单词放入到一个结果文件中。
总结
Map阶段(MapTask): 切片(Split)-----读取数据(Read)-------交给Mapper处理(Map)------分区和排序(sort)
Reduce阶段(ReduceTask): 拷贝数据(copy)------排序(sort)-----合并(reduce)-----写出(write)
由于默认的InputFormat接口使用的是TextInputFormat类,所以我们只需要查看该类的getSplits方法,该方法位于TextInputFormat的父类FileInputFormat中。
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- StopWatch sw = (new StopWatch()).start();
- // minSize从mapreduce.input.fileinputformat.split.minsize和1之间对比,取最大值
- long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
- // 读取mapreduce.input.fileinputformat.split.maxsize,如果没有设置使用Long.MaxValue作为默认值
- long maxSize = getMaxSplitSize(job);
- // 开始切片
- List<InputSplit> splits = new ArrayList();
- // 获取当前job输入目录中所有文件的状态(元数据)
- List<FileStatus> files = this.listStatus(job);
- boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", false);
- Iterator var10 = files.iterator();
- while(true) {
- while(true) {
- while(true) {
- FileStatus file;
- // 如果输入目录中的文件已全部切片,没有可执行文件了,就结束循环
- do {
- if (!var10.hasNext()) {
- job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
- sw.stop();
- return splits;
- }
- file = (FileStatus)var10.next();
- } while(ignoreDirs && file.isDirectory());
-
- // 获取文件路径,开始切片逻辑
- Path path = file.getPath();
- long length = file.getLen();
- if (length != 0L) {
- // 获取文件的块信息
- BlockLocation[] blkLocations;
- if (file instanceof LocatedFileStatus) {
- blkLocations = ((LocatedFileStatus)file).getBlockLocations();
- } else {
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- blkLocations = fs.getFileBlockLocations(file, 0L, length);
- }
- // 判断指定文件是否可切,如果可切,就进行切片
- if (this.isSplitable(job, path)) {
- long blockSize = file.getBlockSize();
- // 计算片大小
- long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
- // 声明待切部分数据的剩余大小
- long bytesRemaining;
- int blkIndex;
- // 如果 待切部分 / 片大小 > 1.1,先切去一片,再判断
- for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
- // 获取开始切片的offset是哪一个block
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- // 执行一次切片,并放入切片集合中
- splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
- }
- // 将剩余不能继续切的部分,作为一个片
- if (bytesRemaining != 0L) {
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
- }
- } else {
- // 文件不可切,整个文件作为1片!
- splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
- }
- } else {
- // 文件是个空文件,创建一个切片对象,这个切片从当前文件的 0 offset起,向后读取0个字节
- splits.add(this.makeSplit(path, 0L, length, new String[0]));
- }
- }
- }
- }
- }
总结:
①获取当前输入目录中所有的文件
②以文件为单位切片,如果文件为空文件,默认创建一个空的切片
③如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)
④如果文件不可切,整个文件作为1片
⑤如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…
⑥剩余部分整个作为1片
- protected boolean isSplitable(JobContext context, Path file) {
- // 根据文件的后缀名获取文件使用的相关的压缩格式
- CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
- // 如果文件不是一个压缩类型的文件,默认都可以切片,否则判断是否是一个可以切片的压缩格式,默认只有Bzip2压缩格式可切片(SplittableCompressionCodec只有一个BZip2Codec子类)
- return null == codec ? true : codec instanceof SplittableCompressionCodec;
- }
- /**
- blockSize: 块大小
- minSize: minSize从mapreduce.input.fileinputformat.split.minsize和1之间对比,取最大值
- maxSize: 读取mapreduce.input.fileinputformat.split.maxsize,如果没有设置使用Long.MaxValue作为默认值
- */
- protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
可看出默认的片大小就是文件的块大小。文件的块大小默认为128M,所以默认每片就是128M!
调节片大小 > 块大小:配置 mapreduce.input.fileinputformat.split.minsize > 128M
调节片大小 < 块大小:配置 mapreduce.input.fileinputformat.split.maxsize < 128M
理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的MapTask少,Map阶段运算慢!片越小,切片数量多,启动的MapTask多,Map阶段运算快!
片(InputSplit):在计算MR程序时,才会切片。片在运行程序时,临时将文件从逻辑上划分为若干部分!使用的输入格式不同,切片的方式不同,切片的数量也不同!每片的数据最终也是以块的形式存储在HDFS!
块(Block): 在向HDFS写文件时,文件中的内容以块为单位存储!块是实际的物理存在!
MapTask在读取切片的内容时,需要根据切片的metainfo,获取到当前切片属于文件的哪部分! 再根据此信息去寻找对应的块,读取数据!
建议: 片大小最好等于块大小!将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO!。
原因: MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO!
可以看出当切片大小不等于块大小时,MapTask2和MapTask3需要从其他DataNode上去拷贝所需数据至自己机器上参与计算,这无疑增加了网络IO
TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的起始字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。
例如文本内容:
解析出来的map:
每一行均为一条记录,被分隔符分割为key,value。可以通过在设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");来设定分隔符。默认分隔符是tab(\t)。
例如文本内容,以——>进行分割:
解析出来的map:
此时的键是每行排在制表符之前的Text序列。
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即切片数 = 输入文件的总行数/N,如果不整除,切片数=商+1。
举例文本内容:
假设N = 2, 则每个输入分片包含两行。开启2个MapTask。
mapper1:
mapper2:
这里的键和值与TextInputFormat生成的一样。
框架默认的TextInputformat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
RecoedReader : LineRecordReader(将一行封装为一个key-value) key(LongWritable): 行的偏移量, value(Text): 行的内容
虚拟存储切片最大值设置
如下,可以根据实际的小文件大小情况来设置具体的值:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
也可以直接设置参数:
mapreduce.input.fileinputformat.split.maxsize
切片机制
流程:
一. 以文件为单位,划分part
①将输入目录下所有文件按照文件名称字典顺序一次读入,记录文件大小
②将每个文件划分为若干part
③判断: 文件的待切部分的大小 <= maxSize,整个待切部分作为1part
④maxsize < 文件的待切部分的大小 <= 2* maxSize,将整个待切部分均分为2part
⑤文件的待切部分的大小 > 2* maxSize,先切去maxSize大小,作为1部分,剩余待切部分继续判断!
举例: maxSize=2048
二. 将之前切分的若干part进行累加,累加后一旦累加的大小超过 maxSize,这些作为1片!如上图所示
- // 源码在MapTask类中的sortAndSpill方法
- // this.partitions表示ReduceTask的数量(可见init方法中对该参数的赋值)
- for(int i = 0; i < this.partitions; ++i) {
- // 按照分区将数据一条一条写出到指定溢写文件
- writer.append(key, value);
- }
③ 最终效果:临时文件中的数据是按照分区编号由小到大,并且每个分区的数据都是按照key有序的存储说明:
系统执行排序的过程(即将Mapper输出作为输入传给Reducer)称为Shuffle
怎么决定要多少个分区?是否需要自定义分区?
1. 分区的数量决定决定了ReduceTask的数量,所以间接决定了要生成文件的个数。简而言之,如果你最终需要将一个文件的内容按照某种规则分成N个文件,那么分区的数量就是N
2. 如果没有手动设定分区数量,那么默认采用Hash分区器,即根据key值按照hash算法算出一个分区号,分区号相同的被分到一个区(算法如下)。由此可见,默认的分区器我们是不可控制哪些数量被分到一个分区。所以如果我们有明确需求:需要将具有相同特征的数据分到一组处理,那么我们就需要使用自定义分区
- public int getPartition(K2 key, V2 value, int numReduceTasks) {
- return (key.hashCode() & 2147483647) % numReduceTasks;
- }
案例:
给定一个文件,该文件中每一行都有一个随机的手机号,此时我们需要将手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
- /*
- * KEY, VALUE: Mapper输出的Key-value类型
- * FlowBean:自定义Mapper输出类型
- */
- public class MyPartitioner extends Partitioner<Text, FlowBean>{
-
- // 计算分区 numPartitions为总的分区数,reduceTask的数量
- // 分区号必须为int型的值,且必须符合 0<= partitionNum < numPartitions
- @Override
- public int getPartition(Text key, FlowBean value, int numPartitions) {
- String suffix = key.toString().substring(0, 3);
- int partitionNum=0;
-
- switch (suffix) {
- case "136":
- partitionNum=numPartitions-1;
- break;
- case "137":
- partitionNum=numPartitions-2;
- break;
- case "138":
- partitionNum=numPartitions-3;
- break;
- case "139":
- partitionNum=numPartitions-4;
- break;
- default:
- break;
- }
- return partitionNum;
- }
- }
- public class FlowBeanDriver{
- public static void main(String[] args) {
- //作为整个Job的配置
- Configuration conf = new Configuration();
- // ①创建Job
- Job job = Job.getInstance(conf);
- // TODO 省略一些配置
- // 设置ReduceTask的数量为5
- job.setNumReduceTasks(5);
- // 设置使用自定义的分区器
- job.setPartitionerClass(MyPartitioner.class);
- }
- }
说明:自定义分区器返回的分区号并不能随意指定:0 <= 分区号 < numPartitions( 指定的ReduceTask的数量)
在MapTask流程中有排序的步骤,排序是根据key来排序的,那么按照什么进行排序呢?这就需要使用比较器来定义。
- public RawComparator getOutputKeyComparator() {
- // 获取自定义的比较器
- Class<? extends RawComparator> theClass = this.getClass("mapreduce.job.output.key.comparator.class", (Class)null, RawComparator.class);
- return (RawComparator)(theClass != null ? (RawComparator)ReflectionUtils.newInstance(theClass, this) : WritableComparator.get(this.getMapOutputKeyClass().asSubclass(WritableComparable.class), this));
- }
从上述代码可以看出
那么如何自定义比较器,有以下几种方式:
- public class MyDescComparator extends WritableComparator{
- // 实现Long的倒序排序
- // 如果key是LongWritable类型,降序也可以直接使用LongWritable中的DecreasingComparator
- @Override
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- long thisValue = readLong(b1, s1);
- long thatValue = readLong(b2, s2);
- return (thisValue<thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
- }
- }
- // 创建Job
- Job job = Job.getInstance(new Configuration());
- // 设置使用自定义的比较器
- job.setSortComparatorClass(DecreasingComparator.class);
- // 运行Job
- job.waitForCompletion(true);
- public class MyRawComparator implements RawComparator<FlowBean>{
- private FlowBean key1=new FlowBean();
- private FlowBean key2=new FlowBean();
- private DataInputBuffer buffer=new DataInputBuffer();
-
- // 负责从缓冲区中解析出要比较的两个key对象,调用 compare(Object o1, Object o2)对两个key进行对比
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- buffer.reset(b1, s1, l1); // parse key1
- key1.readFields(buffer);
-
- buffer.reset(b2, s2, l2); // parse key2
- key2.readFields(buffer);
-
- buffer.reset(null, 0, 0); // clean up reference
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return compare(key1, key2);
- }
-
- // Comparable的compare(),实现最终的比较
- @Override
- public int compare(FlowBean o1, FlowBean o2) {
- return -o1.getSumFlow().compareTo(o2.getSumFlow());
- }
- }
- // 创建Job
- Job job = Job.getInstance(new Configuration());
- // 设置使用自定义的比较器
- job.setSortComparatorClass(DecreasingComparator.class);
- // 运行Job
- job.waitForCompletion(true);
- @Data
- public class FlowBean implements WritableComparable<FlowBean>{
- private long upFlow;
- private long downFlow;
- private Long sumFlow;
-
- // 。。。省略其他代码
-
- // 系统封装的比较器在对比key时,调用key的compareTo进行比较
- @Override
- public int compareTo(FlowBean o) {
- return -this.sumFlow.compareTo(o.getSumFlow());
- }
- }
Combiner实际上本质是一个Reducer类,且只有在设置了之后,才会运行!
Combiner和Reducer的区别
1. Reducer是在reduce阶段调用,而Combiner是在shuffle阶段(既可以在MapTask的shuffle,也可以在ReduceTask的shuffle)调用!
2. 本质都是Reducer类,作用都是对有相同key的key-value进行合并!
Combiner意义
在shuffle阶段对相同key的key-value进行提前合并,从而可以减少磁盘IO和网络IO!
使用条件
使用Combiner必须保证不能影响处理逻辑和结果!经验证,Combiner只能用在加、减操作的场景,不能用在乘、除操作的场景,用在乘、除操作的场景会导致最终计算的结果与不使用Combiner时不一致
调用时机
Combiner既有可能在MapTask端调用:
①每次溢写前会调用Combiner对溢写的数据进行局部合并
②在merge(多个溢写出的临时文件合并为一个文件)时,如果溢写的片段数>=3,(即溢写出的临时文件>=3)如果设置了Combiner,Combiner会再次对数据进行Combine!
Combiner也有可能在ReduceTask端调用:
③shuffle线程拷贝多个MapTask同一分区的数据,拷贝后执行merge和sort,
如果数据量过大,需要将部分数据先合并排序后,溢写到磁盘!
如果设置了Combiner,Combiner会再次运行!
ReduceTask的数量取决于分区的数量,因为每个ReduceTask只会处理所有MapTask输出的相同分区的数据,具体流程如下
直接使用一个案例来解释:
有一个文件内容如下所示,orderid:订单id,pid:商品id,account: 商品金额
需求:统计同一笔订单中,金额最大的商品记录输出
分析得出: 在同一笔订单中,对每条记录的金额进行降序排序,最大的排前边,那么此时就需要进行二次排序,即先按照订单号进行排序(升降都可以)再按照金额进行降序排序。而为了将同一个订单的记录分到一组,就需要使用分组比较器。而MapReduce只能针对key进行排序,所以我们又需要封装一个bean作为key,该bean中至少包含orderid和account两个字段
MapTask阶段:
OrderBean
- @Data
- public class OrderBean implements WritableComparable<OrderBean>{
-
- private String orderId;
- private String pId;
- private Double acount;
- // 序列化
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(orderId);
- out.writeUTF(pId);
- out.writeDouble(acount);
- }
- // 反序列化
- @Override
- public void readFields(DataInput in) throws IOException {
- orderId=in.readUTF();
- pId=in.readUTF();
- acount=in.readDouble();
- }
-
- // 二次排序,先按照orderid排序(升降序都可以),再按照acount(降序)排序
- @Override
- public int compareTo(OrderBean o) {
-
- //先按照orderid排序升序排序
- int result=this.orderId.compareTo(o.getOrderId());
- if (result==0) {
- //再按照acount(降序)排序,降序前面加个负号即可
- result=-this.acount.compareTo(o.getAcount());
- }
- return result;
- }
- }
OrderMapper
- /*
- * LongWritable, Text: 默认使用TextInputFormat,所以LongWritable表示行号,Text表示那一行的内容
- */
- public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
-
- private OrderBean out_key=new OrderBean();
- private NullWritable out_value=NullWritable.get();
-
- @Override
- protected void map(LongWritable key, Text value,
- Context context)
- throws IOException, InterruptedException {
- String[] words = value.toString().split("\t");
- out_key.setOrderId(words[0]);
- out_key.setPId(words[1]);
- out_key.setAcount(Double.parseDouble(words[2]));
- context.write(out_key, out_value);
- }
- }
经过MapTask的shuffle阶段后,传入Reduce的数据应是如下所示,即先按orderid升序排序,再按照金额降序排序:
ReduceTask阶段
在此阶段会获取分组比较器,如果没设置默认使用MapTask排序时key的比较器!而在这个案例中默认的比较器比较策略不符合要求,因为它认为orderId一样且acount一样的记录才是一组。那我们就需要自定义分组比较器,只按照orderId进行对比,只要OrderId一样,认为key相等,这样可以将orderId相同的分到一个组!
实现自定义GroupingComparator
实现自定义的分组比较器和之前实现自定义比较器一样,都是两种方式,继承WritableCompartor 或 实现RawComparator
- //1. 继承WritableCompartor
- public class MyGroupingComparator2 extends WritableComparator{
-
- // 必须调用一次父类构造器,且s第三个参数要传true,否则会报空指针异常,第二个conf参数不传的话会默认构造
- public MyGroupingComparator2() {
- super(OrderBean.class,null,true);
- }
-
- public int compare(WritableComparable a, WritableComparable b) {
- OrderBean o1=(OrderBean) a;
- OrderBean o2=(OrderBean) b;
- return o1.getOrderId().compareTo(o2.getOrderId());
- }
- }
-
- // 或2. 实现RawComparator
- public class MyGroupingComparator implements RawComparator<OrderBean>{
-
- private OrderBean key1=new OrderBean();
- private OrderBean key2=new OrderBean();
- private DataInputBuffer buffer=new DataInputBuffer();
-
- @Override
- public int compare(OrderBean o1, OrderBean o2) {
- return o1.getOrderId().compareTo(o2.getOrderId());
- }
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- buffer.reset(b1, s1, l1); // parse key1
- key1.readFields(buffer);
- buffer.reset(b2, s2, l2); // parse key2
- key2.readFields(buffer);
- buffer.reset(null, 0, 0); // clean up reference
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return compare(key1, key2);
- }
- }
Reducer
- public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
- @Override
- protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
- // 由于从mapper传入reducer的数据已经按照商品价格降序排了序
- // 所以下面这行代码是获取最大的商品价格
- Double maxAcount = key.getAcount();
-
- // 这里遍历values有一个注意点:
- // 每次调用迭代器迭代下个记录时,使用反序列化器从文件中或内存中读取下一个key-value数据的值,key-value对象始终是同一个对象,即指针不变,只是在循环过程中会重新给key-value赋值。
- // 按照此案例,这一组数据如下所示,value为NULLWritable
- // 10000001 Pdt_02 222.8
- // 10000001 Pdt_01 222.8
- // 10000001 Pdt_05 25.8
- // 那么第一次遍历,key值为10000001 Pdt_02 222.8,第二次遍历key变为10000001 Pdt_01 222.8,第三次变为10000001 Pdt_05 25.8
- for (NullWritable nullWritable : values) {
- // 由于每次遍历,key会被重新赋值,所以只需要和获取的maxAcount进行比较,相同就写出。
- if (!key.getAcount().equals(maxAcount)) {
- break;
- }
- //复合条件的记录
- context.write(key, nullWritable);
- }
- }
- }
启动类
- public class OrderBeanDriver {
-
- public static void main(String[] args) throws Exception {
- //作为整个Job的配置
- Configuration conf = new Configuration();
- // ①创建Job
- Job job = Job.getInstance(conf);
-
- // ②设置Job
- // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
- job.setMapperClass(OrderMapper.class);
- job.setReducerClass(OrderReducer.class);
-
- // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
- // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
- job.setOutputKeyClass(OrderBean.class);
- job.setOutputValueClass(NullWritable.class);
-
- // 设置输入目录和输出目录
- FileInputFormat.setInputPaths(job, new Path("E:\\mrinput\\groupcomparator"));
- FileOutputFormat.setOutputPath(job, new Path("e:/mroutput/groupcomparator"));
-
- // 设置自定义的分组比较器
- job.setGroupingComparatorClass(MyGroupingComparator2.class);
- // ③运行Job
- job.waitForCompletion(true);
- }
- }
OutputFormat是一个MapReduce输出数据的基类
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入
步骤:
案例说明:
需求:过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log,log日志内容如下:
Mapper
- /*
- * 1.什么时候需要Reduce
- * ①合并
- * ②需要对数据排序
- * 2. 没有Reduce阶段,key-value不需要实现序列化
- */
- public class CustomOFMapper extends Mapper<LongWritable, Text, String, NullWritable>{
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
-
- String content = value.toString();
-
- context.write(content+"\r\n", NullWritable.get());
- }
- }
OutputFormat
- public class MyOutPutFormat extends FileOutputFormat<String, NullWritable>{
- @Override
- public RecordWriter<String, NullWritable> getRecordWriter(TaskAttemptContext job)
- throws IOException, InterruptedException {
- return new MyRecordWriter(job);
- }
- }
RecordWriter
- public class MyRecordWriter extends RecordWriter<String, NullWritable> {
-
- private Path atguiguPath=new Path("e:/atguigu.log");
- private Path otherPath=new Path("e:/other.log");
-
- private FSDataOutputStream atguguOS ;
- private FSDataOutputStream otherOS ;
-
- private FileSystem fs;
-
- private TaskAttemptContext context;
-
- public MyRecordWriter(TaskAttemptContext job) throws IOException {
- context=job;
- Configuration conf = job.getConfiguration();
- fs=FileSystem.get(conf);
- atguguOS = fs.create(atguiguPath);
- otherOS = fs.create(otherPath);
- }
-
- // 负责将key-value写出到文件
- @Override
- public void write(String key, NullWritable value) throws IOException, InterruptedException {
- if (key.contains("atguigu")) {
- atguguOS.write(key.getBytes());
- context.getCounter("MyCounter", "atguiguCounter").increment(1);
- // 统计输出的含有atguigu 的key-value个数
- }else {
- otherOS.write(key.getBytes());
- context.getCounter("MyCounter", "otherCounter").increment(1);
- }
- }
-
- // 关闭操作
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- if (atguguOS != null) {
- IOUtils.closeStream(atguguOS);
- }
- if (otherOS != null) {
- IOUtils.closeStream(otherOS);
- }
- if (fs != null) {
- fs.close();
- }
- }
- }
启动类
- Job job = Job.getInstance(new Configuration());
- job.setMapperClass(CustomOFMapper.class);
- // 设置输入和输出格式
- job.setOutputFormatClass(MyOutPutFormat.class);
- // 取消reduce阶段
- job.setNumReduceTasks(0);
- // ...
计数器可以记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量
计数器API
1. 采用枚举的方式统计计数
context.getCounter(枚举).increment(1);
2.采用计数器组、计数器名称的方式统计,组名和计数器名称随便起,但最好有意义
context.getCounter("counterGroup", "counter").increment(1);
案例可查看自定义OutputFormat中的RecordWriter
Reduce Join类似sql中的Join,将两个表的数据通过某个字段关联起来,返回两个表的数据。
假设有order.txt文件,字段有orderId,pid,amount,pd.txt文件:pid,pname,输出:orderId,pid,amount,pname
可见我们只需要将AB文件的pid进行关联即可。那么实现如下:
1. 在Map阶段,封装数据。 自定义的Bean需要能够封装两个切片中的所有的数据。而两种不同的数据,经过同一个Mapper的map()处理,因此需要在map()中,判断切片数据的来源,且将来源存入Bean中,以便在reduce阶段根据来源执行不同的封装策略
2. 相同pid的数据,需要分到同一个区,即以pid为条件分区,pid相同的分到一个区
3. 在reduce输出时,只需要将来自于order.txt中的数据,将pid替换为pname,而不需要输出所有的key-value(在Map阶段对数据打标记,标记哪些key-value属于order.txt,哪些属于pd.txt)
ReduceJoin需要在Reduce阶段实现Join功能,一旦数据量过大,效率低,因为需要分区和排序才能进行合并,那么可以使用MapJoin解决ReduceJoin低效的问题!即每个MapTask在map()中完成Join!
做法:
只需要将要Join的数据order.txt作为切片,让MapTask读取pd.txt(不以切片形式读入),而直接在MapTask中使用HDFS下载此文件,下载后,使用输入流手动读取其中的数据!在map()之前通常是将大文件以切片形式读取,小文件手动读取!
JoinBean
- public class JoinBean implements Writable{
- private String orderId;
- private String pid;
- private String pname;
- private String amount;
- // TODO 。。。
- }
MapJoinMapper
- /*
- * 1. 在Hadoop中,hadoop为MR提供了分布式缓存(job.addCacheFile(new URI("file://e:/cache/xx.txt"));)
- * 用来缓存一些Job运行期间的需要的文件(普通文件,jar,归档文件(har))
- * 分布式缓存会假设当前的文件已经上传到了HDFS,并且在集群的任意一台机器都可以访问到这个URI所代表的文件
- * 分布式缓存会在每个节点的task运行之前,提前将文件发送到节点
- * 分布式缓存的高效是由于每个Job只会复制一次文件,且可以自动在从节点对归档文件解归档
- */
- public class MapJoinMapper extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
-
- private JoinBean out_key=new JoinBean();
- private Map<String, String> pdDatas=new HashMap<String, String>();
- //在map之前手动读取pd.txt中的内容
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
-
- //从分布式缓存中读取数据
- URI[] files = context.getCacheFiles();
- for (URI uri : files) {
- BufferedReader reader = new BufferedReader(new FileReader(new File(uri)));
- String line="";
- //循环读取pd.txt中的每一行
- while(StringUtils.isNotBlank(line=reader.readLine())) {
- String[] words = line.split("\t");
- pdDatas.put(words[0], words[1]);
- }
- reader.close();
- }
- }
-
- //对切片中order.txt的数据进行join,输出
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String[] words = value.toString().split("\t");
- out_key.setOrderId(words[0]);
- out_key.setPname(pdDatas.get(words[1]));
- out_key.setAmount(words[2]);
- context.write(out_key, NullWritable.get());
-
- }
- }
启动类
- public class MapJoinDriver {
- public static void main(String[] args) throws Exception {
- // ①创建Job
- Job job = Job.getInstance(new Configuration());
- job.setJarByClass(MapJoinDriver.class);
- // 为Job创建一个名字
- job.setJobName("xxx");
- // ②设置Job
- // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
- job.setMapperClass(MapJoinMapper.class);
- // 设置输入目录和输出目录
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- // 设置分布式缓存
- job.addCacheFile(new URI("file:///e:/pd.txt"));
- //取消reduce阶段
- job.setNumReduceTasks(0);
- // ③运行Job
- job.waitForCompletion(true);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。