赞
踩
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。
以WordCount为例:
Map: ( k1 ; v1 ) →[ ( k2 ; v2 ) ]
Shuffle过程(不需要我们写)
Reduce:( k2 ; [ v2 ] )→[ ( k3 , v3 ) ]
一个完整的mapreduce程序在分布式运行时有三类实例进程:
Client提交计算任务
启动AppMaster进程
AppMaster请求分配资源
ResourceManager回复资源列表
AppMaster要求NodeManager分配资源
NodeManager执行具体的计算任务
NodeManager将计算状态和结果汇报给AppMaster
AppMaster汇报计算结果
ResourceManager分配任务
NodeManager实际执行任务
1)在hadoop集群上上传文件至hdfs
#新建一个文件
vim wordcount.txt
#在文件中写入实例数据 按i键进入文件写入模式,写入后按esc,输入:wq!保存更改
hello,world,hadoop
hive,hello,tom
love,hadoop
hdfs
#上传到HDFS
## 在HDFS文件系统中增加目录wordcount
hdfs dfs -mkdir /wordcount/
## 将刚刚创建的文件上传至该文件夹,登录web界面(50070)可以看到成功上传
hdfs dfs -put wordcount.txt /wordcount
2)导入依赖pom.xml
注意查看自己的hadoop版本
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies>
3)测试hadoop连接环境
可以使用BigData插件测试,参照文章https://blog.csdn.net/weixin_44155966/article/details/108820920
package com.hunan.MapReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* 四个泛型解释: KEYIN:K1的类型 偏移量 VALUEIN:V1的类型 每行字符串 KEYOUT:K2的类型 单词 VALUEOUT:V2的类型 固定值1 */ //hadoop自己的类型 public class WordCountMapper extends Mapper<LongWritable,Text, Text,LongWritable> { //map方法就是将K1,V1转为K2,V2 /* 参数: key : K1 行偏移量 value : V1 每一行的文本数据 context: 表示上下文对象,将各个流程连在一起 */ /* 如何将K1,V1转换为K2,V2 K1 V1 0 hello,world,hadoop 19 hive,hello,tom ---------------------- K2 V2 hello 1 world 1 hadoop 1 hive 1 hello 1 tom 1 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text text = new Text(); LongWritable longWritable = new LongWritable(); //1.将一行的文本数据进行拆分 String[] split = value.toString().split(","); //2.遍历数组,组装K2,V2 for (String s : split) { //3.将K2,V2写入上下文中 //context.write(new Text(s), new LongWritable(1)); text.set(s); longWritable.set(1); context.write(text,longWritable); } } }
package com.hunan.MapReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* 四个泛型解释: KEYIN:K2的类型 单词 VALUEIN:V2的类型 1 KEYOUT:K3的类型 单词 VALUEOUT:V3的类型 个数 */ public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { //map方法就是将新K2,V2转为K3,V3 /* 参数: key: 新K2 values: 集合 新V2 context:上下文对象 */ /* 如何将新K2,V2转为K3,V3 新K2 新V2 hello <1,1> world <1> hadoop <1,1,1> --------------- K3 V3 hello 2 world 1 hadoop 3 */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count=0; //1.遍历集合,将集合中数字相加,得到V3 for (LongWritable value : values) { count+=value.get(); } //2.将K2和V3写入上下文中 context.write(key,new LongWritable(count)); } }
package com.hunan.MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool { //该方法用于指定一个job任务,从提交到结果保存的整个任务 public int run(String[] args) throws Exception { //1.创建一个job任务对象 Job job = Job.getInstance(super.getConf(), "WordConut"); //2.配置job任务对象(八个步骤) //第一步:指定文件读取方式和读取路径 job.setInputFormatClass(TextInputFormat.class); //TextInputFormat.addInputPath(job,new Path("hdfs://master:9000/wordcount")); TextInputFormat.addInputPath(job,new Path("file:///C:\\wordcount")); //第二步:指定Map阶段的处理方式和数据类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class);//设置Map阶段K2的类型 job.setMapOutputKeyClass(LongWritable.class);//设置Map阶段V2的类型 //第三,四,五,六步 采用默认的shuffle阶段处理 //第七步:指定ruduce阶段的处理方式和数据类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class);//设置Reduce阶段K2的类型 job.setMapOutputKeyClass(LongWritable.class);//设置Reduce阶段V2的类型 //第八步:设置输出类型和输出路径 job.setOutputFormatClass(TextOutputFormat.class); //TextOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/wordcount_out")); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount_output")); //等待任务结束 boolean b = job.waitForCompletion(true); return b? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //configuration.set("mapred.job.tracker", "192.168.60.101:9000"); //启动job任务 int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
将MapReduce程序提交给Yarn集群,分发到很多的节点上并发执行
处理的数据和输出结果应该位于HDFS文件系统
提交集群的实现步骤:将程序打成JAR包(双击右侧maven-Lifecycle-package),并上传至节点,然后在集群上用hadoop命令启动。
两个参数分别为jar包名,main方法的路径
hadoop jar hadoop_learning-1.0-SNAPSHOT.jar com.hunan.MapReduce.JobMain
TextInputFormat.addInputPath(job,new Path("file:///d:\\data\\wordcount"));
TextOutputFormat.setOutputPath(job,new Path("file:///d:\\data\\wordcount_output"));
hadoop jar hadoop_learning-1.0-SNAPSHOT.jar com.hunan.MapReduce.JobMain
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。