赞
踩
目录
- package com.kgf.mapreduce;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- /***
- *
- * 继承的Mapper参数如下:
- *第一个参数key:LongWritable表示输入的key的行号
- *第二个参数value:Text表示一行内容
- *第三个参数key: Text表示单词
- *第四个参数value:IntWritable表示计算后的单词的个数
- * @author kgf
- *
- */
- public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
-
- Text k = new Text();
- IntWritable v = new IntWritable(1);
-
- /**
- * 使用map方法去处理数据,数据是一行一行进入到这个方法处理的
- * key:表示行号
- * value:表示一行数据内容
- */
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- //首先我们将一行内容转换成String
- String line = value.toString();
- //数据的单词之间是以空格切割的
- String[] words = line.split(" ");
- //将数据循环写出到下一阶段
- for (String word : words) {
- k.set(word);
- context.write(k, v);
- }
- }
- }

⑶创建自定义的Reducer类对分类的数据进行汇总
- package com.kgf.mapreduce;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- /**
- * 注意:这里继承Reducer的前两个入参就是Mappper的出参数
- * @author kgf
- *
- */
- public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
-
- /**
- * 这个方法主要是对map分类之后的数据进行聚合的
- */
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
-
- //统计单词个数
- int sum = 0;
- for (IntWritable count : values) {
- sum+=count.get();
- }
- //输出单词总个数
- context.write(key, new IntWritable(sum));
- }
-
- }

⑷创建Driver提交任务
- package com.kgf.mapreduce;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class WordCountDriver {
-
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
- //1:首先获取job信息
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- //2:获取jar包位置,指定入口类,hadoop会自己找到
- job.setJarByClass(WordCountDriver.class);
-
- //3:关联自定义的mapper和reducer
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
-
- //4:设置map输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- //5:设置reducer输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- //6:设置数据输入和输出文件路径,这里我们通过main方法获取参数路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- //7:提交代码
- boolean result = job.waitForCompletion(true);
- System.exit(result?0:1);
- }
-
- }

⑶将准备的hello.txt文件上传的hdfs指定目录下
⑷使用我们的jar去测试(当前路径:/opt/module/hadoop-3.1.3):
命令:hadoop jar +jar名称 +Driver入口的全路径 +输入路径 +输出路径
执行成功生成的文件:
⑸查看文件内容
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。