赞
踩
MapReduce思想在生活中处处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核心是分而治之,充分利用了并行处理的优势。
即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
MapReduce任务过程是分为两个处理阶段:
Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。
Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
Reduce阶段:Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。
再次理解MapReduce的思想
经过查看分析官方WordCount案例源码我们发现一个统计单词数量的MapReduce程序的代码由三个部
分组成,
Mapper类
Reducer类
运行作业的代码(Driver)
Mapper类继承了org.apache.hadoop.mapreduce.Mapper类重写了其中的map方法,Reducer类继承
了org.apache.hadoop.mapreduce.Reducer类重写了其中的reduce方法。
重写的Map方法作用:map方法其中的逻辑就是用户希望mr程序map阶段如何处理的逻辑;
重写的Reduce方法作用:reduce方法其中的逻辑是用户希望mr程序reduce阶段如何处理的逻辑
为什么进行序列化?
序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结
构。
观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束,比如自定义Mapper有四个形
参类型,但是形参类型并不是常见的java基本类型。
为什么Hadoop要选择建立自己的序列化格式而不使用java自带serializable?
序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过RPC(远
程过程调用:Remote Procedure Call)实现;RPC将消息序列化成二进制流发送到远程节点,远
程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:
紧凑:数据更紧凑,能充分利用网络带宽资源
快速:序列化和反序列化的性能开销更低
Hadoop使用的是自己的序列化格式Writable,它比java的序列化serialization更紧凑速度更快。一
个对象使用Serializable序列化后,会携带很多额外信息比如校验信息,Header,继承体系等。
Java基本类型与Hadoop常用序列化类型
Mapper类
用户自定义一个Mapper类继承Hadoop的Mapper类
Mapper的输入数据是KV对的形式(类型可以自定义)
Map阶段的业务逻辑定义在map()方法中
Mapper的输出数据是KV对的形式(类型可以自定义)
注意:map()方法是对输入的一个KV对调用一次!!
Reducer类
用户自定义Reducer类要继承Hadoop的Reducer类
Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
Reducer的业务逻辑写在reduce()方法中
Reduce()方法是对相同K的一组KV对调用执行一次
Driver阶段
创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路
径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运
行
3.4.1 需求
在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:wc.txt;
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.liu</groupId> <artifactId>stage04-hdfs</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> </dependencies> <!--maven打包插件 --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
整体思路梳理(仿照源码)
Map阶段:
package com.liu.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //需求:单词计数 //1 继承Mapper类 //2 Mapper类的泛型参数:共4个,2对kv //2.1 第一对kv:map输入参数类型 //2.2 第二队kv:map输出参数类型 // LongWritable, Text-->文本偏移量(后面不会用到),一行文本内容 //Text, IntWritable-->单词,1 public class WordCountMapper extends Mapper<LongWritable, Text ,Text, IntWritable> { Text word = new Text(); IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.获取一行 String line =value.toString(); //2.按照 空格切割 String[] words = line.split(" "); //3. 输出<单词,1> for (String w : words) { word.set(w); context.write(word,one); } } }
package com.liu.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.FileOutputStream; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /* 1. 获取配置文件对象,获取job对象实例 2. 指定程序jar的本地路径 3. 指定Mapper/Reducer类 4. 指定Mapper输出的kv数据类型 5. 指定最终输出的kv数据类型 6. 指定job处理的原始数据路径 7. 指定job输出结果路径 8. 提交作业 */ // 1. 获取配置文件对象,获取job对象实例 Configuration conf = new Configuration(); final Job job = Job.getInstance(conf, "WordCountDriver"); //2. 指定程序jar的本地路径 job.setJarByClass(WordCountDriver.class); // 3. 指定Mapper/Reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4. 指定Mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 指定最终输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定读取数据的原始路径 FileInputFormat.setInputPaths(job,new Path(args[0])); //指定结果数据输出路径 // 7. 指定job输出结果路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定结果数据输出路径 // 8. 提交作业 final boolean flag = job.waitForCompletion(true); //jvm退出:正常退出0,非0值则是错误退出 System.exit(flag ? 0 : 1); } }
hadoop mapreduce yarn
hdfs hadoop mapreduce
mapreduce yarn apache
apache
apache
注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程
方式模拟的mr的运行
把程序打成jar包,改名为wc.jar;上传到Hadoop集群
准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行无法获取数
据!!
启动Hadoop集群(Hdfs,Yarn)
使用Hadoop 命令提交任务运行
jar wc.jar com.liu.mr.wc.WordCountDriver /wcinput/ /wcoutput
基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对
象就需要实现Writable序列化接口
public CustomBean() {
super();
}
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
....
}
@Override
public int compareTo(CustomBean o) {
// 自定义排序规则
return this.num > o.getNum() ? -1 : 1;
}
package com.liu.mr.speak; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //这个类型是map输出kv中value的类型,需要实现writable序列化接口 public class SpeakBean implements Writable { //定义属性 private Long selfDuration;//自有内容时长 private Long thirdPartDuration;//第三方内容时长 private String deviceId;//设备id private Long sumDuration;//总时长 //准备一个空参构造 public SpeakBean() { } //序列化方法:就是把内容输出到网络或者文本中 @Override public void write(DataOutput out) throws IOException { out.writeLong(selfDuration); out.writeLong(thirdPartDuration); out.writeUTF(deviceId); out.writeLong(sumDuration); } //有参构造 public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) { this.selfDuration = selfDuration; this.thirdPartDuration = thirdPartDuration; this.deviceId = deviceId; this.sumDuration = this.selfDuration + this.thirdPartDuration; } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { this.selfDuration = in.readLong();//自由时长 this.thirdPartDuration = in.readLong();//第三方时长 this.deviceId = in.readUTF();//设备id this.sumDuration = in.readLong();//总时长 } public Long getSelfDuration() { return selfDuration; } public void setSelfDuration(Long selfDuration) { this.selfDuration = selfDuration; } public Long getThirdPartDuration() { return thirdPartDuration; } public void setThirdPartDuration(Long thirdPartDuration) { this.thirdPartDuration = thirdPartDuration; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public Long getSumDuration() { return sumDuration; } public void setSumDuration(Long sumDuration) { this.sumDuration = sumDuration; } //为了方便观察数据,重写toString()方法 @Override public String toString() { return selfDuration + "\t" + thirdPartDuration + "\t" + deviceId + "\t" + sumDuration; } }
package com.liu.mr.speak; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //四个参数:分为两对kv //第一对kv:map输入参数的kv类型;k-->一行文本偏移量,v-->一行文本内容 //第二对kv:map输出参数kv类型;k-->map输出的key类型,v:map输出的value类型 public class SpeakMapper extends Mapper<LongWritable, Text,Text,SpeakBean> { /* 1 转换接收到的text数据为String 2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean 3 直接输出:k-->设备id,value-->speakbean */ Text device_id = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. 转换接收到的text数据为String final String line = value.toString(); //2.按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean final String[] fields = line.split("\t"); //自有内容时长 String selfDuration = fields[fields.length - 3]; //第三方内容时长 String thirdPartDuration = fields[fields.length - 2]; //设备id String deviceId = fields[1]; device_id.set(deviceId); final SpeakBean speakBean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId); context.write(device_id,speakBean); } }
package com.liu.mr.speak; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> { @Override protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException { //定义时长累加的初始值 Long self_duration = 0L; Long third_part_duration = 0L; //reduce方法的key:map输出的某一个key //reduce方法的value:map输出的kv对中相同key的value组成的一个集合 //reduce 逻辑:遍历迭代器累加时长即可 for (SpeakBean bean : values) { final Long selfDuration = bean.getSelfDuration(); final Long thirdPartDuration = bean.getThirdPartDuration(); self_duration += selfDuration; third_part_duration += thirdPartDuration; } //输出,封装成一个bean对象输出 final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString()); context.write(key, bean); } }
package com.liu.mr.speak; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SpeakDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf, "speakDriver"); //设置jar包本地路径 job.setJarByClass(SpeakDriver.class); //使用的mapper和reducer job.setMapperClass(SpeakMapper.class); job.setReducerClass(SpeakReducer.class); //map的输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SpeakBean.class); //设置reduce输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(SpeakBean.class); //读取的数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交任务 final boolean flag = job.waitForCompletion(true); System.exit(flag ? 0 : 1); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。