赞
踩
Hadoop集群搭建前安装准备参考:
一、Hadoop系统应用之安装准备(一)(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7)
一、Hadoop系统应用之安装准备(二)(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7)
Hadoop集群搭建过程参考:
二、Hadoop系统应用之Hadoop集群搭建(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7)
MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。
使用MapReduce操作海量数据时,每个MapReduce程序被初始化为一个工作任务,每个工作任务可以分为Map和Reduce两个阶段。Map阶段负责将任务分解,即把复杂的任务分解成若干个“简单的任务”来并行处理,但前提是这些任务没有必然的依赖关系,可以单独执行任务;Reduce阶段负责将任务合并,即把Map阶段的结果进行全局汇总。
在本地运行模式使用MapReduce程序实现词频统计功能。
(需启动集群的HDFS和YARN服务)
在src/main/java下新建Package,名为com.itcast.mr.wordcount。之后分别在该包下创建三个类文件:WordCountMapper.java、WordCountReducer.java、WordCountDriver.java。
在WordCountMapper.java文件下新增如下内容,实现MapReduce的Map阶段。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 这里就是mapper阶段具体业务逻辑实现的方法 该方法的调用取决于读取数据的组件有没有给MR传入数据
* 如果有数据传入,每一个<k,v>对,map就会被调用一次
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// 拿到传入进来的一行内容,把数据类型转换为String
String line = value.toString();
// 将这行内容按照分隔符切割
String[] words = line.split(" ");
// 遍历数组,每出现一个单词就标记一个数组1 例如:<单词,1>
for (String word : words) {
// 使用MR上下文context,把Map阶段处理的数据发送给Reduce阶段作为输入数据
context.write(new Text(word), new IntWritable(1));
//第一行 hadoop hadoop spark 发送出去的是<hadoop,1><hadoop,1><spark,1>
}
}
}
注:上述java文件内容未加入包的导入部分未进行展示,可在抛出异常时进行添加,下同。
在WordCountReducer.java文件下新增如下内容,实现MapReduce的Reduce阶段。
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 这里是REDUCE阶段具体业务类的实现方法
* 第一行 hadoop hadoop spark 发送出去的是<hadoop,1><hadoop,1><spark,1>
* reduce接受所有来自Map阶段处理的数据之后,按照Key的字典序进行排序
* 按照key是否相同作一组去调用reduce方法
* 本方法的key就是这一组相同的kv对 共同的Key
* 把这一组的所有v作为一个迭代器传入我们的reduce方法
*
* 迭代器:<hadoop,[1,1]>
*
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数
for (IntWritable iw : value) {
count += iw.get();
}
context.write(key, new IntWritable(count));
}
}
在WordCountDriver.java文件下新增如下内容,进行程序的提交。
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 通过Job来封装本次MR的相关信息
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");
Job wcjob = Job.getInstance(conf);
// 指定MR Job jar包运行主类
wcjob.setJarByClass(WordCountDriver.class);
// 指定本次MR所有的Mapper Reducer类
wcjob.setMapperClass(WordCountMapper.class);
wcjob.setReducerClass(WordCountReducer.class);
// 设置我们的业务逻辑 Mapper类的输出 key和 value的数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
// 设置我们的业务逻辑 Reducer类的输出 key和 value的数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
// 指定要处理的数据所在的位置
FileInputFormat.setInputPaths(wcjob, "D:/hadoop-1/mr/input");
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(wcjob, new Path("D:/hadoop-1/mr/output"));
// 提交程序并且监控打印程序执行情况
boolean res = wcjob.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
在WordCountDriver.java中待处理数据所在的路径下新建相关的文件夹,并新建待处理的txt文本信息,路径与内容如下:
在WordCountDriver.java文件下右键点击运行,以Java应用的形式进行运行,获得结果。
结果将保存在WordCountDriver.java中写好的保存路径下。
参考文献: 黑马程序员.Hadoop大数据技术原理与应用[M].北京:清华大学出版社,2019.
后续学习链接:
五、Hadoop系统应用之MapReduce分布式计算框架(二)(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。