赞
踩
首先写一个WordMapper类
- package com.kgc.myhdfs001.wc;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
- /*WordMapper继承Mapper,注意继承的这个Mapper是org.apache.hadoop.mapreduce里面的
- 该类就是mr程序map阶段调用的类,也就是maptask。负责把单词一个个分割出来
- Long String是jdk自带的数据类型
- 在网络传输序列化中 hadoop认为其效率不高,所以自己封装了一套数据类型,包括自己的序列化机制(Writable)
- Long----->LongWritable
- String--->Text
- */
- public class WordMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
- @Override
- //Alt+ins重写map方法,LongWritable相当于Long,但它可序列化,key是行的起始偏移量(光标所在的偏移值)
- //Text相当于String,value是行的内容
-
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- System.out.println(key);
- //将内容转化成String,并按照空格分隔,得到个数组,数组里面存的是一个个单词
- String[] words = value.toString().split(" ");
- //遍历数组 单词出现就标记1
- for(String word:words){
- //使用context把map处理完的结果写出去
- context.write(new Text(word),new LongWritable(1));
- }
- }
- }
然后写WordReducer类
- package com.kgc.myhdfs001.wc;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
- /*
- WordReducer继承Reducer类,注意这个类是org.apache.hadoop.mapreduce.Reducer
- 该类就是mr程序reduce阶段运行的类 ,也就是reducetask。负责汇总统计单词个数
- */
- public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
- @Override
- protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- int count=0;
- for(LongWritable lw:values){
- count+=lw.get();
- }
- context.write(key,new LongWritable(count));
- }
- }
写这些类的时候导包一定不要导错
然后写主类
- package com.kgc.myhdfs001.wc;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class WordJob {
- public static void main(String[] args) throws Exception{
- //使用job构建本次mr程序
- Job job = Job.getInstance(new Configuration());
- //指定本次mr程序运行的主类
- job.setJarByClass (WordJob.class);
-
- //读哪个文件
- FileInputFormat.setInputPaths(job,new Path("file:///d:/temp/read/a.txt"));
- //读完统计个数的结果放哪里(write这个文件夹不要提前创建出来,否则会报错)
- FileOutputFormat.setOutputPath(job,new Path("file:///d:/temp/write"));
-
- //指定本次mr程序的mapper
- job.setMapperClass(WordMapper.class);
- //指定本次mr程序map阶段的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- //指定本次mr程序的reducer
- job.setReducerClass(WordReducer.class);
- //指定本次mr程序reduce阶段的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
- job.waitForCompletion(true);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。