赞
踩
统计数据文件中每个字母出现的次数,以字母-次数的形式输出,例如(a 14)。
在map阶段读取每一行以空格分隔的数据,以(字母,1)的形式输出到reduce阶段,在输出之前会底层会实现分区功能,把字母相同的结果分到一个区间,传递到reduce阶段,在reduce阶段统计每一个分区的字母出现次数。最终输出结果到指定文件夹。
Mapper:
package com.worldcount.zqd; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text k = new Text(); LongWritable l = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //读取一行数据 String[] words = StringUtils.split(line, " "); //利用工具类对读取的数据的空格进行切割,返回数组 for (String word:words) { k.set(word); // 获取word l.set(1); // 默认为1 context.write(k, l); //写入键值对 } } }
Reducer:
package com.worldcount.zqd; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WCreducer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable l = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; //计数变量 // 增强for拿到分区的所有value值, 进行累加 for (LongWritable value : values) { count += value.get(); } l.set(count); context.write(key, l); } }
主程序:
package com.worldcount.zqd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCrunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WCrunner.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("C:\\Users\\Lenovo\\Desktop\\hadoop_mr\\wc_input")); FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\Lenovo\\Desktop\\hadoop_mr\\wc_out")); job.waitForCompletion(true); } }
结果如下图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。