赞
踩
Mapreduce遵循的是的分而治之的设计思想,将一个大的任务拆分成若干个小的任务,然后并行的进行处理。在任务处理中,有两类任务,一个是MapTask,另一个是ReduceTask。
当前这个模板类是从官网直接找到的,功能是单词统计。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
因为在Mapreduce程序运行中会大量用到网络I/O进行数据传输,所以在Mapreduce程序中所有类型必须使用序列化类型。可以自定义数据类型,查看Mapreduce的自定义数据类型。 Mapreduce自带的数据类型有如下:
<properties> <hadoop.version>2.6.0-cdh5.14.0</hadoop.version> </properties> <dependencies> <!--引入单元测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
Mapreduce的处理流程分为五大阶段,分别是Input、Map、Shuffle、reduce和Output。Mapreduce在处理流程中所有数据都是以key-value形式存在。
负责整个程序的读入,默认读取HDFS上面的文件,读取的时候会按照分片规则进行分片,可以通过job的属性值进行修改,如下示例:
/*该值默认为TextInputFormat.class,可以指定为其他值。
TextInputFormat.class
功能:从hdfs上读取数据,并且将HDFS上读取到的内容变成key-value格式
文件中一行会变成一个 keyvalue
key:行的偏移量
value:行的内容
*/
job.setInputFormatClass();
根据Input阶段中的split个数来启动对应的MapTask,每个MapTask处理一部分数据,但是处理的逻辑是相同的。split个数由分片规则决定,通过查看FileInputFormat类的源码getSplits()方法和computeSplitSize()方法可以看出一个分片大概在128M左右,可以通过调整mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize这两个参数修改分片规则大小。需要构建一个类继承Mapper类实现Mapper类的map方法并提供具体的实现。每一条key-value数据会调用一次map方法。
Shuffle有分区、排序、分组的功能,Mapreduce的Shuffle默认会执行所有的功能,Shuffle的设计初衷是为了统计数据。
分区是为了多进程的并行,有几个ReduceTask就有几个分区,分区决定了当前这条数据会进入到那个ReduceTask中,被那个ReduceTask处理。默认的分区规则是根据key的hash值取余ReduceTask的个数,由默认的分区器HashPartitioner实现。Mapreduce的Shuffle分区规则也可以自定义,自定义后可以通过job.setPartitionerClass()方式设置到Mapreduce程序中。实现方式请查看Shuffle的自定义分区。
默认按照key的字典顺序进行排序。
按照key进行分组,相同的key的value会放在同一个迭代器中。
处理MapTask的结果。
将内存中的数据写入文件,每一个MapTask会将自己处理的结果放入一个环形的内存缓冲区中,缓冲区的大小默认是100M,当缓冲区数据达到80%的时候,会触发溢写,所有即将被溢写到磁盘的文件都会进行分区和排序,分区默认就是按照key的hash取余进行的,也可以自定义分区,排序的话会调用默认的排序器或者compareTo方法,排序的算法是快排,这里的排序不是整个批次数据有序,而是各个分区的数据内部有序。
溢写完成后每个分区都会有很多小文件,合并的功能就是把每个分区的所有小文件合并成一个大文件,保证每个MapTask只有一个大文件。合并的话因为是很多小文件合并成一个大文件,所以也会进行排序,这里用到的排序算法是归并排序,排序的逻辑还是相同的。
将自己的结果给reduce方法。
通过Http协议,每个RedcueTask会到每个MapTask中拉取属于自己的数据。
数据拉取到ReduceTask后是乱序的,这里也会发生排序,这里的排序算法也是归并排序(内存中只放索引),排序逻辑相同。
在这里会将相同key的value放到一个迭代器中。
合并,启动ReduceTask并把MapTask阶段处理完成的数据拉取到各个ReduceTask上,默认情况下ReduceTask只有一个。并且,在ReduceTask上面进行数据的最后处理。需要构建一个类继承Reducer类实现reuduce方法并提供具体的实现。每一条key-value数据会调用一次reduce方法。
负责将上一步Reduce的结果进行输出,几个ReduceTask就产生几个结果文件,默认输出到HDFS上变成文件,并且key-value之间以制表符分隔。可以通过job的属性值进行修改,如下示例:
/*该值默认为TextOutputFormat.class,可以指定为其他值。
TextOutputFormat.class
功能:从hdfs上读取数据,并且将HDFS上读取到的内容变成key-value格式
文件中一行会变成一个 keyvalue
key:行的偏移量
value:行的内容
*/
job.setOutputFormatClass();
继承Partitioner抽象类并实现抽象类的getPartition方法。
import org.apache.hadoop.mapreduce.Partitioner; /** * @ClassName Test * @Description: TODO * @Author qywang * @Date 2021/6/19 * @Version V1.0 **/ public class Test<K, V> extends Partitioner<K, V> { /** * @param k 一条数据的key * @param v 一条数据的value * @param numPartitions ReduceTask的个数 * @return */ @Override public int getPartition(K k, V v, int numPartitions) { return (k.hashCode() & Integer.MAX_VALUE) % numPartitions; } }
/** * @ClassName WordCountBean * @Description TODO 自定义数据类型,用于封装单词和单词长度 */ public class WordCountBean implements Writable { //定义属性 private String word; private int length ; public WordCountBean() { } public WordCountBean(String word, int length) { this.setWord(word); this.setLength(length); } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } //序列化 @Override public void write(DataOutput out) throws IOException { //输出两个对象 out.writeUTF(this.word); out.writeInt(this.length); } //反序列化:注意顺序必须与序列化的顺序保持一致,不然会导致值错乱 @Override public void readFields(DataInput in) throws IOException { //读出两个对象 this.word = in.readUTF(); this.length = in.readInt(); } //toString方法,在输出的 时候会被调用,写入文件或者打印 @Override public String toString() { return this.word+"\t"+this.length; } }
/** * @ClassName WordCountBean * @Description TODO 自定义数据类型,用于封装单词和单词长度 */ public class WordCountBean implements WritableComparable<WordCountBean> { //定义属性 private String word; private int length ; //构造 public WordCountBean() { } public void WordCountBean(String word, int length) { this.setWord(word); this.setLength(length); } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } //序列化 @Override public void write(DataOutput out) throws IOException { //输出两个对象 out.writeUTF(this.word); out.writeInt(this.length); } //反序列化:注意顺序必须与序列化的顺序保持一致,不然会导致值错乱 @Override public void readFields(DataInput in) throws IOException { //读出两个对象 this.word = in.readUTF(); this.length = in.readInt(); } //toString方法,在输出的 时候会被调用,写入文件或者打印 @Override public String toString() { return this.word+"\t"+this.length; } //用于在shuffle阶段的排序以及分组 @Override public int compareTo(WordCountBean o) { //先比较第一个值 int comp = this.getWord().compareTo(o.getWord()); if(0 == comp){ //如果第一个值,相等,以第二个值的比较结果作为最终的结果 return Integer.valueOf(this.getLength()).compareTo(Integer.valueOf(o.getLength())); } return comp; } }
这种主要观察程序是否需要进行Shuffle,如果程序要经过Shuffle,则必须实现WritableComparable接口。实现WritableComparable接口主要是在Shuffle阶段对key进行排序使用,可以实现compareTo方法自定义。
在源代码中调用排序器的逻辑,如果有自定义的排序器就用自定义的排序器,如果没有的话判断该类是否实现了compareTo方法,有的话就用compareTo方法,如果还是没有的话,则会调用默认的key的字典升序进行排序。
自定义数据类型可以调用该类型中的compareTo方法实现排序。
官方自带的数据类型默认的排序方式为对key进行升序,如果要对key降序的话,需要自定义一个排序器,然后通过job.setSortComparatorClass()方法设置自定义的排序器进去就可以。示例:
/** * @ClassName UserSort * @Description TODO 用户自定义排序器 */ public class UserSort extends WritableComparator { /** * 注册类型的转换 */ public UserSort() { //类型的转换注册,允许将类型转换为Text super(Text.class,true); } /** * 调用排序的方法 * @param a * @param b * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { //需求:比较Text类型,传入WritableComparable,要进行转换 Text o1 = (Text) a; Text o2 = (Text) b; //降序 return -o1.compareTo(o2); } }
Combiner也称为Map端的聚合,默认情况下是关闭的,如果要使用的话,必须手动设置。判断程序是否可以使用Combiner,只需要看reduce方法的输入是否等于输出。如果相同就可以使用Combiner,如果不相同,就不行。使用Combiner主要是因为正常情况下Map端的并发数是要远远大于Reduce端的并发数的,将Reduce端的聚合操作先在Map端分别完成,能有效降低数据量,减轻Reduce端的负载。可以通过job.setCombinerClass()方法进行设置,Combinesr的类一般就是Reducer的类,聚合逻辑是一致的。
Mapreduce的压缩主要使用在Shuffle阶段的数据写入磁盘中,具体的压缩配置可以参考Hive教程这篇文件中Hive调优中提到的压缩。
Reduce join在Mapreduce中适用于大的数据量和大的数据量进行join,效率比较低。
/** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{ private Text outputKey = new Text(); private Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //先判断当前的数据是哪个文件的 FileSplit inputSplit = (FileSplit) context.getInputSplit();//先获取这条数据属于哪个文件分片 String fileName = inputSplit.getPath().getName();//获取文件名称 //如果是订单数据,key为第三个字段,value是其他剩余字段 if("orders.txt".equals(fileName)){ String[] items = value.toString().split(","); this.outputKey.set(items[2]);//商品id this.outputValue.set(items[0]+"\t"+items[1]+"\t"+items[3]);//其他字段 context.write(this.outputKey,this.outputValue);//将订单数据输出 }else{ //如果是商品数据,key为第一个字段,value是第二个字段 String[] split = value.toString().split(","); this.outputKey.set(split[0]);//商品id this.outputValue.set(split[1]);//商品名称 context.write(this.outputKey,this.outputValue);//输出商品数据 } } } /** * 定义Reducer的实现类以及Reduce过程中的处理逻辑 */ public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{ private Text outputValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接 } this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value context.write(key,this.outputValue); } }
Map join在Mapreduce中适用于小的数据量和大的数据量进行join。将小数据量数据放入分布式缓存,大数据的每个MapTask需要用到时,从分布式缓存中取,然后直接在Map端完成join,不需要进行Shuffle。
示例:
public class MapJoin extends Configured implements Tool { /** * 具体整个MapReduce job的定义:构建、配置、提交 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 构建一个job */ //创建一个job的实例 Job job = Job.getInstance(this.getConf(),"mrjob"); //设置job运行的类 job.setJarByClass(MapJoin.class); /** * 配置job */ //input:定义输入的方式,输入的路径 Path orderPath = new Path("datas/mrjoin/orders.txt"); TextInputFormat.setInputPaths(job,orderPath); //将商品的数据放入分布式缓存 Path productPath = new Path("hdfs://v6:8020/datas/mrjoin/product.txt"); //必须使用绝对路径 job.addCacheFile(productPath.toUri()); //map:定义Map阶段的类及输出类型 job.setMapperClass(MRJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //shuffle:定义shuffle阶段实现的类 //reduce:定义reduce阶段的类及输出类型 // job.setReducerClass(MRJoinReduce.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0);//设置Reduce的个数,就是分区的个数 //output:定义输出的类以及输出的路径 Path outputPath = new Path("datas/output/join/mapjoin"); //如果输出存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath); /** * 提交job:并根据job运行的结果返回 */ return job.waitForCompletion(true) ? 0:-1; } /** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{ private Text outputKey = new Text(); private Text outputValue = new Text(); Map<String,String> maps = new HashMap<>(); /** * Map和Reduce的类:三个方法 * 1-setup:会在map或者reduce方法之前执行 * 2-map/reduce:map逻辑或者reduce逻辑 * 3-close:最后执行的方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //将分布式缓存的 数据读取进来 URI[] cacheFiles = context.getCacheFiles();//获取所有的缓存数据 //读取文件内容 BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath())); String line = null; while(StringUtils.isNotBlank(line = bufferedReader.readLine())){ //读取到每一行的内容 String pid = line.split(",")[0];//商品id String productName = line.split(",")[1];//商品名称 //将商品id和名称放入map集合 maps.put(pid,productName); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取订单数据 String[] items = value.toString().split(","); String pid = items[2]; //订单中的商品id String productName = maps.get(pid); this.outputKey.set(productName); this.outputValue.set(value.toString()); context.write(this.outputKey,this.outputValue); } }
Mapreduce不适合处理小文件,一个小文件就占一个块,对应就有一个MapTask,导致资源的浪费。一个MapTask从启动申请资源,到处理数据,最后关闭资源,如果数据小,处理数据时间非常短,可能都没有启动和关闭的时间长。在文件处理前做合并:将多个小文件合并成一个文件【Input中直接实现】,多个分片实现合并输出。自定义一个读取器。
/** * @ClassName UserRecordReader * @Description TODO 自定义一个读取器,将整个文件的内容,变成一个keyvalue * return new LineRecordReader(recordDelimiterBytes); 将每一行变成一个keyvalue返回 */ public class UserRecordReader extends RecordReader<NullWritable, BytesWritable> { //定义keyvalue,用于赋值和返回 private NullWritable key = NullWritable.get(); private BytesWritable value = new BytesWritable(); //定义全局的配置文件对象 Configuration conf = null; //定义全局的分片对象 FileSplit fileSplit = null; //设置标志变量 boolean flag = false; /** * 初始化的方法,只调用一次 * @param split * @param context * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { conf = context.getConfiguration(); //从上下文中获取当前程序的配置对象 fileSplit = (FileSplit) split; //获取当前切片的信息 } /** * 读取并封装每一条keyvalue * 如果返回true:表示还有下一条,继续调用该方法读取 * 第一次执行时,必须返回true,将第一次得到的keyvalue保存,然后读取下一条 * 如果返回false:表示没有下一条,将当前结果进行返回 * 如果第一次直接返回false,表示该分片中没有任何数据 * @return * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(!flag){ //从每个分片中读取数据,变成keyvalue:读取每个文件,将文件的内容放入value中 //先构建一个 文件系统对象 FileSystem hdfs = FileSystem.get(conf); //先从切片中获取该切片对应 的文件路径 Path path = fileSplit.getPath(); //打开这个文件 FSDataInputStream open = hdfs.open(path); //将输入流赋值给字节数组 byte[] bytes = new byte[(int) fileSplit.getLength()]; IOUtils.readFully(open,bytes,0, (int) fileSplit.getLength()); //将字节数组赋值给value this.value.set(bytes,0, (int) fileSplit.getLength()); //关闭资源 open.close(); hdfs.close(); //修改标志 flag = true; //返回 return true; } return false; } //返回当前读取到的key @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return key; } //返回当前读取到的value @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } //获取当前读取数据的进度的 @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { //释放资源 } }
自定义一个输入类
/** * @ClassName UserInputFormat * @Description TODO 自定义的输入:将读取到的每个小文件 = 每个分片 = 变成一个keyvalue */ public class UserInputFormat extends FileInputFormat<NullWritable, BytesWritable> { /** * 构建一个读取者对象 * @param split * @param context * @return * @throws IOException * @throws InterruptedException */ @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //构建一个读取者对象 UserRecordReader userRecordReader = new UserRecordReader(); //调用初始化方法 userRecordReader.initialize(split,context);//将当前方法传入进来的分片和 上下文对象传递给初始化方法 return userRecordReader; } /** * 代表是否可分割:需求中不需要分割 * @param context * @param filename * @return */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false;//输入的 数据不可分割 } }
/** * @ClassName UserInputMR * @Description TODO 用于实现自定义输入 */ public class UserInputMR extends Configured implements Tool { /** * 具体整个MapReduce job的定义:构建、配置、提交 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 构建一个job */ //创建一个job的实例 Job job = Job.getInstance(this.getConf(),"userinput"); //设置job运行的类 job.setJarByClass(UserInputMR.class); /** * 配置job */ //input:定义输入的方式,输入的路径 Path inputPath = new Path("datas/inputformat"); job.setInputFormatClass(UserInputFormat.class);//自定义输入,将每个文件的内容只变成一个keyvalue,调用一次map方法 UserInputFormat.setInputPaths(job,inputPath); //map:定义Map阶段的类及输出类型 job.setMapperClass(MRModelMapper.class); job.setMapOutputKeyClass(Text.class);//在map方法中获取文件名 job.setMapOutputValueClass(BytesWritable.class);//在map方法中获取这个文件所有内容 //shuffle:定义shuffle阶段实现的类 //reduce:定义reduce阶段的类及输出类型 // job.setReducerClass(MRModelReduce.class); job.setOutputKeyClass(Text.class); //文件名 job.setOutputValueClass(BytesWritable.class); //文件内容 job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数 //output:定义输出的类以及输出的路径 Path outputPath = new Path("datas/output/inputformat"); //如果输出存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } //将结果保存为一个Sequencefile文件 job.setOutputFormatClass(SequenceFileOutputFormat.class);//该类型文件为keyvalue的二进制类型文件 SequenceFileOutputFormat.setOutputPath(job,outputPath); /** * 提交job:并根据job运行的结果返回 */ return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //构建一个Conf对象,用于管理当前程序的所有配置 Configuration conf = new Configuration(); //调用当前类的run方法 int status = ToolRunner.run(conf, new UserInputMR(), args); //根据job运行的状态,来退出整个程序 System.exit(status); } /** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRModelMapper extends Mapper<NullWritable,BytesWritable,Text, BytesWritable>{ private Text outputKey = new Text(); @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { //当前输入的value就是该文件的所有内容 //获取文件名封装成Text FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); this.outputKey.set(fileName); //输出这个文件的名称和所有内容 context.write(outputKey,value); } } }
自定义一个输出类。
/** * @ClassName UserOutputFormat * @Description TODO 自定义输出器,将reduce的数据,输出到两个文件 */ public class UserOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //构建hdfs对象 Configuration conf = context.getConfiguration(); FileSystem hdfs = FileSystem.get(conf); //构建输出流 FSDataOutputStream goodContent = hdfs.create(new Path("datas/output/outputformat/good")); FSDataOutputStream badContent = hdfs.create(new Path("datas/output/outputformat/bad")); UserRecordWriter userRecordWriter = new UserRecordWriter(goodContent, badContent); //返回一个 输出器 return userRecordWriter; } }
自定义一个输出器。
/** * @ClassName UserRecordWriter * @Description TODO 真正实现将数据输出的对象 */ public class UserRecordWriter extends RecordWriter<Text, NullWritable> { //定义属性 FSDataOutputStream good = null; FSDataOutputStream bad = null; public UserRecordWriter(FSDataOutputStream goodContent,FSDataOutputStream badContent){ good = goodContent; bad = badContent; } /** * 将keyvalue数据输出的方法 * @param key * @param value * @throws IOException * @throws InterruptedException */ @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String content = key.toString().split("\t")[9]; //将差评写入一个文件:2 if("2".equals(content)){ bad.write(key.toString().getBytes());//将整条差评输出 bad.write("\r\n".getBytes());//输出一个换行 }else { //其他评价写入另一个文件:0,1 good.write(key.toString().getBytes()); good.write("\r\n".getBytes()); } } //关闭资源的方法 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { good.close(); bad.close(); } }
/** * @ClassName UserOutputMR * @Description TODO 将评论数据拆分到不同的文件中 */ public class UserOutputMR extends Configured implements Tool { /** * 具体整个MapReduce job的定义:构建、配置、提交 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 构建一个job */ //创建一个job的实例 Job job = Job.getInstance(this.getConf(),"mrjob"); //设置job运行的类 job.setJarByClass(UserOutputMR.class); /** * 配置job */ //input:定义输入的方式,输入的路径 Path inputPath = new Path("datas/outputformat/ordercomment.csv"); TextInputFormat.setInputPaths(job,inputPath); //map:定义Map阶段的类及输出类型 job.setMapperClass(MRModelMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //shuffle:定义shuffle阶段实现的类 //reduce:定义reduce阶段的类及输出类型 job.setReducerClass(MRModelReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数 //output:定义输出的类以及输出的路径 Path outputPath = new Path("datas/output/outputformat/normal"); //如果输出存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } //修改输出的类 job.setOutputFormatClass(UserOutputFormat.class); UserOutputFormat.setOutputPath(job,outputPath); /** * 提交job:并根据job运行的结果返回 */ return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //构建一个Conf对象,用于管理当前程序的所有配置 Configuration conf = new Configuration(); //调用当前类的run方法 int status = ToolRunner.run(conf, new UserOutputMR(), args); //根据job运行的状态,来退出整个程序 System.exit(status); } /** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRModelMapper extends Mapper<LongWritable,Text,Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将每一行的内容作为key,直接输出 context.write(value,NullWritable.get()); } } /** * 定义Reducer的实现类以及Reduce过程中的处理逻辑 */ public static class MRModelReduce extends Reducer<Text,NullWritable,Text,NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //直接输出每行内容 context.write(key,NullWritable.get()); } } }
Mapreduce默认按照key进行分组,如果想按照其他方式进行分组,可以自定义分组,然后通过job.setGroupingComparatorClass()方法设置分组类。如下示例:
package cn.itcast.hadoop.mapreduce.group; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import java.util.Collections; import java.util.Comparator; /** * @ClassName OrderGroup * @Description TODO 自定义分组,按照key的第一个值来分组 */ public class OrderGroup extends WritableComparator { //注册类型,用于强转 public OrderGroup(){ super(OrderBean.class,true); } //实现两个OrderBean的比较 @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean o1 = (OrderBean) a; OrderBean o2 = (OrderBean) b; //只比较第一个值,订单id,如果订单id,相同就是同一组 return o1.getOrderId().compareTo(o2.getOrderId()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。