赞
踩
Hadoop第一章:环境搭建
Hadoop第二章:集群搭建(上)
Hadoop第二章:集群搭建(中)
Hadoop第二章:集群搭建(下)
Hadoop第三章:Shell命令
Hadoop第四章:Client客户端
Hadoop第四章:Client客户端2.0
Hadoop第五章:词频统计
Hadoop第五章:序列化
Hadoop第五章:几个案例
Hadoop第五章:几个案例(二)
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
这次依旧忽略理论部分继续带来一些案例。
这个说一下这个Combiner,他是mapper的最后一步,可以把一部分reduce的压力分散到mapper的各个节点,进而减少需要网络传输的数据。
简单说一个例子。
现在又十万条数据,10mapper个节点,1个reducer节点,一条数据可以算出一个结果,每个节点有一万的任务量,如果仅仅由reducer进行求和,此节点需要接受十万个数据,对网络压力比较大,如果使用Combiner,数据可以在各自的mapper节点先求和,这样mapeer就需要接受10个数据,大大减小了网络压力。
因为咱们要使用Combiner所以只展示方案一。
新创建一个包,并且从之前写好的词频统计中把之前的代码拷贝过来,并新创建一个类
WordCountCombiner.java
package com.atguigu.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable outV=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum+=value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
然后修改一下driver,将其加入进去。
package com.atguigu.mapreduce.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setCombinerClass(WordCountCombiner.class);
FileInputFormat.setInputPaths(job,new Path("D:\\learn\\hadoop\\wordcount\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\learn\\hadoop\\wordcount\\output"));
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
咱们先不加入运行一次,查看一下数据。
现在把注释去掉在运行一次。
可以明显看到map阶段输出的数据从121减少到了88,这样就可以减轻从map到reducer传输数据的传输压力。
再用词频统计为例,因为咱们的Combiner做的事情和reducer是一摸一样的,所以可以直接使用reducer.class代替Combiner.class这种情况适用于Combiner阶段和reducer阶段逻辑代码相同。且不会改变最终结果。
新创建一个包,并且创建3个基本文件,以及两个新需要的类。
mapper不需要要和修改
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
reducer
由于在mppper阶段我们用value作为reducer的key进行传输,当key相同时我们可能丢数据,所以要进行一个简单的遍历。
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
LogOutputFormat.class
在这里我们需要实现一个RecordWriter方法,可以用idea自动补充,RecordWriter需要返回一个RecordWriter对象,所以我们还需要创建一个类来编写。
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
LogRecordWriter lrw = new LogRecordWriter(job);
return lrw;
}
}
LogRecordWriter.class
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text,NullWritable> {
//创建两个流
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
//启动流
public LogRecordWriter(TaskAttemptContext job) {
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
atguiguOut = fs.create(new Path("D:\\learn\\hadoop\\Log\\output\\atguigu.log"));
otherOut = fs.create(new Path("D:\\learn\\hadoop\\Log\\output\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
//数据处理后使用不同流,写入对应的文件
@Override
public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException {
String log = key.toString();
if (log.contains("atguigu")){
atguiguOut.writeBytes(log+"\n");
}else {
otherOut.writeBytes(log+"\n");
}
}
//关闭流
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
LogDriver.class
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\\learn\\hadoop\\Log\\input"));
//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\\learn\\hadoop\\Log\\logoutput"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
运行结果
这次写不完了,先就到这里吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。