赞
踩
## MapReduce概述?
1、MapReduce是一个分布式计算框架
2、适用于大规模数据处理场景
3、每个job包含Map和Reduce两部分
4、MapReduce 的设计目标是方便编程人员在不熟悉分布式并行开发的情况下,将编写的程序运行在分布式系统上,从而降低了分布式开发的入门门槛。
MapReduce特点
优点
不适用领域
1、MapReduce模型概述
MapReduce是一种编程模型,其创意和灵感来源于函数式编程。MapReduce 从名称上就表现出它的核心原理,即由Map和Reduce两个阶段组成。Map 表示“映射”,由一定数量的 Map Task 组成。Reduce 表示“归约”,由一定数量的Reduce Task 组成。用户只需要写 map() 和 reduce() 函数,就可以完成分布式程序的设计。
MapReduce编程模型:
2、MapReduce编程三部曲
1.输入Input.MapReduce 输入一系列k1/v1对。
2. Map 和 Reduce 阶段。Map:(k1,v1)->list(k2,v2),Reduce:(k2,list(v2))->list(k3,v3)。其中k2/v2是中间结果对
3. 输出Output。MapReduce输出一系列k3/v3对。
1.需求分析:
编程实现如输入/输出内容。
输入:
输出:
2、WordCount代码实现
1.新建Maven工程mapreducesTest,添加pom文件依赖,代码如下。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.kgc</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <hadoop.version>2.6.0</hadoop.version> <hive.version>1.1.0-cdh5.14.2</hive.version> <hbase.version>1.2.0-cdh5.14.2</hbase.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <!--hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!--日志--> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <!--MapReduce--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> </dependency> <!--log4j--> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!--测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <!--<scope>test</scope>--> </dependency> </dependencies> </project>
2.在 WordCount 类中编写 WordMapper 类
关键代码:
package cn.kgc.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* KEYIN, 输入的key类型 VALUEIN, 输入的value类型 KEYOUT, 输出的key类型 VALUEOUT 输出的value类型 */ public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、将文本转换成string String line = value.toString(); //2、将字符串切割 String[] words = line.split("\\s+");//line.split(" ").var可以直接转成需要接收的对象 //3、将每一个单词写出 for (String word : words) { k.set(word); context.write(k,v); } } }
3、在 WordCount 类中编写 WordMapper 类
关键代码:
package cn.kgc.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* KEYIN, reduce端输入的key类型,即map端输出的key VALUEIN, reduce端输入的value类型,即map端输出的value KEYOUT, reduce端输出的key类型 VALUEOUT reduce端输出的key类型 */ public class WCReduce extends Reducer<Text, IntWritable,Text, IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce端接受到的类型大概是这样的 (wish, (1,1,1,1,1,1,1,1)) //遍历迭代器 int sum = 0; for (IntWritable count : values) { //对迭代器进行累加求和 sum+=count.get(); } //将key和value写出 v.set(sum); context.write(key,v); } }
4.编写编写启动词频统计功能的驱动类WordMain。
关键代码:
package cn.kgc.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; import java.io.IOException; public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"wordcount"); //2、设置jar的位置 job.setJarByClass(WCDriver.class); //3、设置map和reduce的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReduce.class); //4、设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输出的路径 FileInputFormat.setInputPaths(job, new Path("file:///D:\\Idea\\ideaMaven\\hadoopdfs1\\data\\wcinput")); FileOutputFormat.setOutputPath(job, new Path("file:///D:\\Idea\\ideaMaven\\hadoopdfs1\\data\\wcoutput")); //7、提交程序运行 boolean result = job.waitForCompletion(true); System.out.println(result ? 0:1); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。