赞
踩
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器上运行。
也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。
就是因为这个特点使得 MapReduce 编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。
比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1)不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。
这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
3)不擅长 DAG(有向无环图)
计算多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。
在这种情况下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
需求:统计其中每一个单词出现的总次数(查询结果:a-p一个文件,q-z一个文件)
(1)分布式的运算程序往往需要分成至少 2 个阶段。
(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
总结:分析 WordCount 数据流走向深入理解 MapReduce 核心思想。
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责 Map 阶段的整个数据处理流程。
(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。
采用反编译工具反编译源码,发现 WordCount 案例有 Map 类、Reduce 类和驱动类。
且数据的类型是 Hadoop 自身封装的序列化类型。
用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
1.Mapper阶段
2.Reducer阶段
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
1)需求
在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:hello.txt
- atguigu atguigu
- ss ss
- cls cls
- banzhang
- hadoop
- jiao
- xue
期望输出数据:
- atguigu 2
- banzhang 1
- cls 2
- hadoop 1
- jiao 1
- ss 2
- xue 1
2)需求分析
按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。
Mapper:
Reducer
Driver
3)环境准备
(1)创建 maven 工程,MapReduceDemo
(2)在 pom.xml 文件中添加如下依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.3.5</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.30</version>
- </dependency>
- </dependencies>
(2)在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”
- log4j.rootLogger=INFO,stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)创建包名:com.xiang.mapreduce.wordcount
4)编写程序
(1)编写 Mapper 类
- package com.xiang.mapreduce.wordcount;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- Text k = new Text();
- IntWritable v = new IntWritable(1);
-
-
- // 1 获取一行
- String line = value.toString();
- // 2 切割
- String[] words = line.split(" ");
- // 3 输出
- for (String word : words) {
- k.set(word);
- context.write(k, v);
- }
-
- }
- }
(2)编写 Reducer 类
- package com.xiang.mapreduce.wordcount;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- int sum;
- IntWritable v = new IntWritable();
-
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- // 1 累加求和
- sum = 0;
- for (IntWritable count : values) {
- sum += count.get();
- }
- // 2 输出
- v.set(sum);
- context.write(key, v);
- }
- }
(3)编写 Driver 驱动类
- package com.xiang.mapreduce.wordcount;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- public class WordCountDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // 1 获取配置信息以及获取 job 对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- // 2 关联本 Driver 程序的 jar
- job.setJarByClass(WordCountDriver.class);
- // 3 关联 Mapper 和 Reducer 的 jar
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
- // 4 设置 Mapper 输出的 kv 类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- // 5 设置最终输出 kv 类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- // 6 设置输入和输出路径
- // String input = "L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\hello.txt";
- // String output = "L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\out";
- String input = args[0];
- String output = args[1];
- FileInputFormat.setInputPaths(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(output));
- // 7 提交 job
- boolean result = job.waitForCompletion(true);
- System.exit(result ? 0 : 1);
- }
- }
5)本地测试
集群上测试
(1)用 maven 打 jar 包,需要添加的打包插件依赖
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
(2)将程序打成 jar 包
(3)修改不带依赖的 jar 包名称为 wc.jar,并拷贝该 jar 包到 Hadoop 集群的/opt/module/hadoop-3.3.6 路径
(4)启动 Hadoop 集群
myhadoop.sh start
(5)执行 WordCount 程序
hadoop jar wc.jar com.xiang.mapreduce.wordcount.WordCountDriver /input/word.txt /output2
执行结果
1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。
然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
所以,Hadoop 自己开发了一套序列化机制(Writable)。
4)Hadoop 序列化特点:
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步。
(1)必须实现 Writable 接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- public FlowBean() {
- super();
- }
(3)重写序列化方法
- public void write(DataOutput out) throws IOException {
- out.writeLong(upFlow);
- out.writeLong(downFlow);
- out.writeLong(sumFlow);
- }
(4)重写反序列化方法
- public void readFields(DataInput in) throws IOException {
- upFlow = in.readLong();
- downFlow = in.readLong();
- sumFlow = in.readLong();
- }
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。
详见后面排序案例。
- @Override
- public int compareTo(FlowBean o) {
- // 倒序排列,从大到小
- return this.sumFlow > o.getSumFlow() ? -1 : 1;
- }
统计每一个手机号耗费的总上行流量、总下行流量、总流量
(1)输入数据phone_data .txt
(2)输入数据格式:
- 7 13560436666 120.196.100.99 1116 954 200
- id 手机号码 网络ip 上行流量 下行流量 网络状态码
(3)期望输出数据格式
- 13560436666 1116 954 2070
- 手机号码 上行流量 下行流量 总流量
1、需求:统计每一个手机号耗费的总上行流量、下行流量、总流量
2、输入数据格式
- 7 13560436666 120.196.100.99 1116 954 200
- id 手机号码 网络ip 上行流量 下行流量 网络状态码
3、期望输出数据格式
- 13560436666 1116 954 2070
- 手机号码 上行流量 下行流量 总流量
4、Map阶段
(1)读取一行数据,切分字段
7 13560436666 120.196.100.99 1116 954 200
(2)抽取手机号、上行流量、下行流量
- 13560436666 1116 954
- 手机号码 上行流量 下行流量
(3)以手机号为key,bean对象为value输出,即context.write(手机号,bean);
(4)bean对象要想能够传输,必须实现序列化接口
5、Reduce阶段
(1)累加上行流量和下行流量得到总流量。
- 13560436666 1116 + 954 = 2070
- 手机号码 上行流量 下行流量 总流量
(1)编写流量统计的 Bean 对象
- public class FlowBean implements Writable {
- private long upFlow; //上行流量
- private long downFlow; //下行流量
- private long sumFlow; //总流量
-
- //2 提供无参构造
- public FlowBean() {
- }
-
- //3 提供三个参数的 getter 和 setter 方法
- public long getUpFlow() {
- return upFlow;
- }
-
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
-
- public long getDownFlow() {
- return downFlow;
- }
-
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
-
- public long getSumFlow() {
- return sumFlow;
- }
-
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
-
- public void setSumFlow() {
- this.sumFlow = this.upFlow + this.downFlow;
- }
-
- //4 实现序列化和反序列化方法,注意顺序一定要保持一致
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(upFlow);
- out.writeLong(downFlow);
- out.writeLong(sumFlow);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- upFlow = in.readLong();
- downFlow = in.readLong();
- sumFlow = in.readLong();
- }
-
- //5 重写 ToString
- @Override
- public String toString() {
- return upFlow + "\t" + downFlow + "\t" + sumFlow;
- }
- }
(2)编写 Mapper 类
- public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
- //解析文本行
- String line = value.toString();
- String[] split = line.split("\t");
- String phone = split[1];
- String up = split[split.length - 3];
- String down = split[split.length - 2];
-
- //取出手机号作为key
- Text outK = new Text();
- outK.set(phone);
-
- //取出流量作为value
- FlowBean outV = new FlowBean();
- outV.setUpFlow(Long.parseLong(up));
- outV.setDownFlow(Long.parseLong(down));
- outV.setSumFlow();
-
- //写出key,value
- context.write(outK, outV);
- }
- }
(3)编写 Reducer 类
- public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
- @Override
- protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
- long totalUp = 0;
- long totalDown = 0;
- //将相同手机号的流量相加
- for (FlowBean value : values) {
- totalDown += value.getDownFlow();
- totalUp += value.getUpFlow();
- }
-
- FlowBean outV = new FlowBean();
- outV.setUpFlow(totalUp);
- outV.setDownFlow(totalDown);
- outV.setSumFlow();
-
- context.write(key, outV);
- }
- }
(4)编写 Driver 驱动类
- public class FlowDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- //获取job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- //关联Driver类
- job.setJarByClass(FlowDriver.class);
-
- //关联Mapper和Reducer
- job.setMapperClass(FlowMapper.class);
- job.setReducerClass(FlowReducer.class);
-
- //关联Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlowBean.class);
-
- //设置最终输出的类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
-
- //设置程序的输入输出路径
- FileInputFormat.setInputPaths(job, new Path("dd"));
- FileOutputFormat.setOutputPath(job, new Path("dd"));
-
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
运行结果:
输入:
结果:
1)问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
思考:
1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。
那么 1K 的数据,也启动 8 个 MapTask,会提高集群性能吗?
MapTask 并行任务是否越多越好呢?
哪些因素影响了 MapTask 并行度?
2)MapTask 并行度决定机制
数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。
数据切片与MapTask并行度决定机制
1、假设切片大小设置为100M
2、假设切片大小设置为128M
1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
2)每一个Split切片分配一个MapTask并行实例处理
3)默认情况下,切片大小=BlockSize
4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
Job 提交流程源码详解
1、建立连接
2、提交job
FileInputFormat 切片源码解析
input.getSplits(job)
1、切片机制
2、案例分析
(1)输入数据有两个文件:
(2)经过FileInputFormat的切片机制运算后,形成的切片信息如下:
3、切片大小的参数设置
(1)源码中计算切片大小的公式
Math.max(minSize, Math.min(maxSize,blockSize))
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.mxasize=Long.MaxValue 默认值Long.MaxValue
因此默认情况下,切片大小=blocksize
(2)切片大小设置
maxsize(切片最大值):参数如果比blocksize小,则会让切片变小,切片大小就等于这个配置项
minsize(切片最小值):参数如果比blocksize大,则可以让切片比blocksize大,切片大小就等于这个配置项
(3)获取切片信息API
- //获取切片大文件名称
- String name = inputSplit.getPath().getName()
- //根据文件类型获取切片信息
- FileSplit inputSplit = (FileSplit)context.getInputSplit();
1)FileInputFormat 实现类
思考:在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
2)TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。
以下是一个示例,比如,一个分片包含了如下 4 条文本记录。
- Rich learning form
- Intelligent learning engine
- Learning more convenient
- From the real demand for more close to the enterprise
每条记录表示为以下键/值对:
- (0,Rich learning form)
- (20,Intelligent learning engine)
- (49,Learning more convenient)
- (74,From the real demand for more close to the enterprise)
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
1)应用场景:
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
2)虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3)切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
比如:
(1)虚拟存储过程:
例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M:
(2)切片过程:
测试举例:
有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,
1)需求
将输入的大量小文件合并成一个切片统一处理。
准备 4 个小文件:a.txt(1KB)、b.txt(1KB)、c.txt(1KB)、d.txt(1KB)
期望:期望一个切片处理4个文件
2)实现过程
(1)不做任何处理,运行 1.8 节的 WordCount 案例程序,观察切片个数为 4。
(2)在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 3。
(a)驱动类中添加代码如下:
- // 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
- job.setInputFormatClass(CombineTextInputFormat.class);
- //虚拟存储切片最大值设置 4m
- CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(b)运行结果为 1个切片。
详细工作流程
总流程
Shuffle 过程
上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第16 步结束,具体 Shuffle 过程详解,如下:
(1)MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
(5)ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
(6)ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)
注意:
(1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。
Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。
1、问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
2、默认Partitioner分区
- public class HashPartitioner<K, V> extends Partitioner<K, V> {
- public int getPartition(K key, V value, int numReduceTasks) {
- return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
- }
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
3、自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
- public class CustomPartitioner extends Partitioner<Text, FlowBean> {
- @Override
- public int getPartition(Text key, FlowBean value, int numPartitions) {
- // 控制分区代码逻辑
- … …
- return partition;
- }
- }
(2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
4、分区总结
(1)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1< ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(4)分区号必须从零开始,逐一累加。
5、案例分析
例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
1)要求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
输入数据:iphone_data.txt
- 1 13160436666 120.196.100.99 1116 954 200
- 2 13260436656 120.196.100.99 1116 954 200
- 3 13360436666 120.196.100.99 1116 954 200
- 4 13460436666 120.196.100.99 1116 954 200
- 5 13560436656 120.196.100.99 1116 954 200
- 6 13660436666 120.196.100.99 1116 954 200
- 7 13760436666 120.196.100.99 1116 954 200
- 8 13860436656 120.196.100.99 1116 954 200
- 9 13960436666 120.196.100.99 1116 954 200
期望输出数据:
手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到一个文件中。
2)实现
在案例 2.3 的基础上,增加一个分区类
- public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
- @Override
- public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
- //获取手机号前三位 prePhone
- String phone = text.toString();
- String prePhone = phone.substring(0, 3);
-
- //定义一个分区号变量 partition,根据 prePhone 设置分区号
- int partition;
-
- if("136".equals(prePhone)){
- partition = 0;
- }else if("137".equals(prePhone)){
- partition = 1;
- }else if("138".equals(prePhone)){
- partition = 2;
- }else if("139".equals(prePhone)){
- partition = 3;
- }else {
- partition = 4;
- }
- //最后返回分区号 partition
- return partition;
- }
- }
3)在驱动函数中增加数据分区设置和 ReduceTask 设置
- //8 指定自定义分区器
- job.setPartitionerClass(ProvincePartitioner.class);
- //9 同时指定相应数量的 ReduceTask
- job.setNumReduceTasks(5);
4)输出结果
排序概述
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
自定义排序 WritableComparable 原理分析
bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。
- @Override
- public int compareTo(FlowBean bean) {
- int result;
- // 按照总流量大小,倒序排列
- if (this.sumFlow > bean.getSumFlow()) {
- result = -1;
- }else if (this.sumFlow < bean.getSumFlow()) {
- result = 1;
- }else {
- result = 0;
- }
- return result;
- }
1)需求
根据案例 2.3 序列化案例产生的结果再次对总流量进行倒序排序。
2)代码实现
(1)FlowBean 对象在在需求 1 基础上增加比较功能
- public class FlowBean implements WritableComparable<FlowBean> {
- ..
- @Override
- public int compareTo(FlowBean o) {
- //按照总流量比较,倒序排列
- if(this.sumFlow > o.sumFlow){
- return -1;
- }else if(this.sumFlow < o.sumFlow){
- return 1;
- }else {
- return 0;
- }
- }
- }
(2)编写 Mapper 类
- public class SortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
- FlowBean outK = new FlowBean();
- Text outV = new Text();
-
- //1 获取一行数据
- String line = value.toString();
- //2 按照"\t",切割数据
- String[] split = line.split("\t");
- //3 封装 outK outV
- outK.setUpFlow(Long.parseLong(split[1]));
- outK.setDownFlow(Long.parseLong(split[2]));
- outK.setSumFlow();
- outV.set(split[0]);
- //4 写出 outK outV
- context.write(outK, outV);
- }
- }
(3)编写 Reducer 类
- public class SortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
- @Override
- protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
- for (Text value : values) {
- //调换 KV 位置,反向写出
- context.write(value,key);
- }
- }
- }
(4)编写 Driver 类
- //关联Mapper的输出类型
- job.setMapOutputKeyClass(FlowBean.class);
- job.setMapOutputValueClass(Text.class);
-
- //设置最终输出的类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
1)需求
要求每个省份手机号输出的文件中按照总流量内部排序
2)需求分析
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
3)案例实操
(1)增加自定义分区类
- public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
- @Override
- public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
- //获取手机号前三位
- String phone = text.toString();
- String prePhone = phone.substring(0, 3);
- //定义一个分区号变量 partition,根据 prePhone 设置分区号
- int partition;
- if ("136".equals(prePhone)) {
- partition = 0;
- } else if ("137".equals(prePhone)) {
- partition = 1;
- } else if ("138".equals(prePhone)) {
- partition = 2;
- } else if ("139".equals(prePhone)) {
- partition = 3;
- } else {
- partition = 4;
- }
- //最后返回分区号 partition
- return partition;
- }
- }
(2)在驱动类中添加分区类
- // 设置自定义分区器
- job.setPartitionerClass(ProvincePartitioner2.class);
- // 设置对应的 ReduceTask 的个数
- job.setNumReduceTasks(5);
自定义 Combiner 实现步骤
(a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法
- public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- IntWritable outV = new IntWritable();
-
- int sum = 0;
- for (IntWritable value : values) {
- sum += value.get();
- }
- outV.set(sum);
- context.write(key,outV);
- }
- }
(b)在 Job 驱动类中设置:
job.setCombinerClass(WordCountCombiner.class);
1)需求
统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用Combiner 功能
期望:Combine 输入数据多,输出时经过合并,输出数据降低。
2)需求分析
增加一个WordcountCombiner类继承Reducer
对每一个MapTask的输出局部汇总
3)案例实操-方案一
(1)增加一个 WordCountCombiner 类继承 Reducer
(2)在 WordcountDriver 驱动类中指定 WordCountCombiner
代码在自定义 Combiner 实现步骤>
4)案例实操-方案二
(1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定
- // 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
- job.setCombinerClass(WordCountReducer.class);
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。
几种常见的OutputFormat实现类
1.OutputFormat实现类
2.默认输出格式TextOutputFormat
3.自定义OutputFormat
3.1 应用场景:例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中
3.2 自定义OutputFormat步骤
1)需求
过滤输入的 log 日志,包含 baidu 的网站输出到 baidu.log,不包含 baidu 的网站输出到other.log。
- http://www.baidu.com
- http://www.alibaba.com
- http://www.alibaba.com
- http://www.aiqiyi.com
- http://www.google.com
- http://www.qq.com
- http://www.tecent.com
2)需求分析
1、自定义一个OutputFormat类
(1)创建一个类LogRecordWriter继承RecordWriter
(a)创建两个文件的输出流:baiduOut、otherOut
(b)如果输入数据包含baidu,输出到baiduOut流; 如果不包含baidu,输出到otherOut流
2、驱动类Driver
要将自定义的输出格式组件设置到job中
3)案例实操
(1)编写 LogMapper 类
- public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
- //不做任何处理,直接写出一行log数据
- context.write(value, NullWritable.get());
- }
- }
(3)自定义一个 LogOutputFormat 类
- public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
- @Override
- public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException {
- //创建一个自定义的 RecordWriter 返回
- return new LogRecordWriter(job);
- }
- }
(4)编写 LogRecordWriter 类
- public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
- private final FSDataOutputStream baiduOut;
- private final FSDataOutputStream otherOut;
-
- public LogRecordWriter(TaskAttemptContext job) throws IOException {
- //获取文件系统对象
- FileSystem fs = FileSystem.get(job.getConfiguration());
- //用文件系统对象创建两个输出流对应不同的目录
- baiduOut = fs.create(new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\logData\\baidu.log"));
- otherOut = fs.create(new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\logData\\other.log"));
- }
-
- @Override
- public void write(Text key, NullWritable value) throws IOException, InterruptedException {
- String log = key.toString();
- if (log.contains("baidu")) {
- baiduOut.writeBytes(log + "\n");
- } else {
- otherOut.writeBytes(log + "\n");
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- //关流
- IOUtils.closeStream(baiduOut);
- IOUtils.closeStream(otherOut);
- }
- }
(5)编写 LogDriver 类
- public class LogDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- job.setJarByClass(LogDriver.class);
-
- job.setMapperClass(LogMapper.class);
- job.setReducerClass(LogReducer.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
- //设置自定义的 outputFormat
- job.setOutputFormatClass(LogOutputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\logData\\input"));
- // 虽自定义了outputFormat,但继承自fileOutputFormat,需要要输出一个_SUCCESS 文件
- FileOutputFormat.setOutputPath(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\logData\\output"));
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
回顾:MapTask 并行度由切片个数决定,切片个数由输入文件和切片规则决定。
思考:ReduceTask 并行度由谁决定?
ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:
- // 默认值是 1,手动设置为 4
- job.setNumReduceTasks(4);
实验:测试 ReduceTask 多少合适
注意事项
1)MapTask 源码解析流程
- =================== MapTask ===================
- context.write(k, NullWritable.get()); //自定义的 map 方法的写出,进入
- output.write(key, value);
- //MapTask727 行,收集方法,进入两次
- collector.collect(key, value,partitioner.getPartition(key, value, partitions));
- HashPartitioner(); //默认分区器
- collect() //MapTask1082 行 map 端所有的 kv 全部写出后会走下面的 close 方法
- close() //MapTask732 行
- collector.flush() // 溢出刷写方法,MapTask735 行,提前打个断点,进入
- sortAndSpill() //溢写排序,MapTask1505 行,进入
- sorter.sort() QuickSort //溢写排序方法,MapTask1625 行,进入
- mergeParts(); //合并文件,MapTask1527 行,进入
- collector.close(); //MapTask739 行,收集器关闭,即将进入 ReduceTask
2)ReduceTask 源码解析流程
- =================== ReduceTask ===================
- if (isMapOrReduce()) //reduceTask324 行,提前打断点
- initialize() // reduceTask333 行,进入
- init(shuffleContext); // reduceTask375 行,走到这需要先给下面的打断点
- totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打断点
- merger = createMergeManager(context); //合并方法,Shuffle 第 80 行
- // MergeManagerImpl 第 232 235 行,提前打断点
- this.inMemoryMerger = createInMemoryMerger(); //内存合并
- this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
- rIter = shuffleConsumerPlugin.run();
- eventFetcher.start(); //开始抓取数据,Shuffle 第 107 行,提前打断点
- eventFetcher.shutDown(); //抓取结束,Shuffle 第 141 行,提前打断点
- copyPhase.complete(); //copy 阶段完成,Shuffle 第 151 行
- taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段,Shuffle 第 152 行
- sortPhase.complete(); //排序阶段完成,即将进入 reduce 阶段 reduceTask382 行
- reduce(); //reduce 阶段调用的就是我们自定义的 reduce 方法,会被调用多次
- cleanup(context); //reduce 完成之前,会最后调用一次 Reducer 里面的 cleanup 方法
将两份不同的数据,通过MapReduce聚合成一份结果。
Map 端的主要工作:
为来自不同表或文件的 key/value 对,打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。
Reduce 端的主要工作:
在 Reduce 端以连接字段作为 key 的分组已经完成,只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进行合并就 ok 了
1)需求:将商品信息表中数据根据商品 pid 合并到订单数据表中。
2)分析:
通过将关联条件作为 Map 输出的 key,将两表满足 Join 条件的数据并携带数据所来源的文件信息,发往同一个 ReduceTask,在 Reduce 中进行数据的串联。
Reduce端表合并(数据倾斜)
3)代码实现
(1)创建商品和订单合并后的 TableBean 类
- public class TableBean implements Writable {
- private String id; //订单 id
- private String pid; //产品 id
- private int amount; //产品数量
- private String pname; //产品名称
- private String flag; //判断是 order 表还是 pd 表的标志字段
- public TableBean() {
- }
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public String getPid() {
- return pid;
- }
- public void setPid(String pid) {
- this.pid = pid;
- }
- public int getAmount() {
- return amount;
- }
- public void setAmount(int amount) {
- this.amount = amount;
- }
- public String getPname() {
- return pname;
- }
- public void setPname(String pname) {
- this.pname = pname;
- }
- public String getFlag() {
- return flag;
- }
- public void setFlag(String flag) {
- this.flag = flag;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(id);
- out.writeUTF(pid);
- out.writeInt(amount);
- out.writeUTF(pname);
- out.writeUTF(flag);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.id = in.readUTF();
- this.pid = in.readUTF();
- this.amount = in.readInt();
- this.pname = in.readUTF();
- this.flag = in.readUTF();
- }
-
- @Override
- public String toString() {
- return id + "\t" + pname + "\t" + amount;
- }
- }
(2)编写 TableMapper 类
- public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
- private String filename;
-
- @Override
- protected void setup(Mapper.Context context) {
- //获取对应文件名称
- InputSplit split = context.getInputSplit();
- FileSplit fileSplit = (FileSplit) split;
- filename = fileSplit.getPath().getName();
- }
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- Text outK = new Text();
- TableBean outV = new TableBean();
-
- //获取一行
- String line = value.toString();
- //判断是哪个文件,然后针对文件进行不同的操作
- String[] split = line.split("\t");
- if (filename.contains("order")) { //订单表的处理
- //封装 outK
- outK.set(split[1]);
- //封装 outV
- outV.setId(split[0]);
- outV.setPid(split[1]);
- outV.setAmount(Integer.parseInt(split[2]));
- outV.setPname("");
- outV.setFlag("order");
- } else { //商品表的处理
- //封装 outK
- outK.set(split[0]);
- //封装 outV
- outV.setId("");
- outV.setPid(split[0]);
- outV.setAmount(0);
- outV.setPname(split[1]);
- outV.setFlag("pd");
- }
- //写出 KV
- context.write(outK, outV);
- }
- }
(3)编写 TableReducer 类
- public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
- @Override
- protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
- ArrayList<TableBean> orderBeans = new ArrayList<>();
- TableBean pdBean = new TableBean();
- for (TableBean value : values) {
- try {
- //判断数据来自哪个表
- if ("order".equals(value.getFlag())) { //订单表
- //创建一个临时 TableBean 对象接收 value
- TableBean tmpOrderBean = new TableBean();
- BeanUtils.copyProperties(tmpOrderBean, value);
- //将临时 TableBean 对象添加到集合 orderBeans
- orderBeans.add(tmpOrderBean);
- } else { //商品表
- BeanUtils.copyProperties(pdBean, value);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- //遍历集合 orderBeans,替换掉每个 orderBean 的 pid 为 pname,然后写出
- for (TableBean orderBean : orderBeans) {
- orderBean.setPname(pdBean.getPname());
- //写出修改后的 orderBean 对象
- context.write(orderBean, NullWritable.get());
- }
- }
- }
(4)编写 TableDriver 类
- public class TableDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Job job = Job.getInstance(new Configuration());
- job.setJarByClass(TableDriver.class);
- job.setMapperClass(TableMapper.class);
- job.setReducerClass(TableReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(TableBean.class);
- job.setOutputKeyClass(TableBean.class);
- job.setOutputValueClass(NullWritable.class);
- FileInputFormat.setInputPaths(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\joinData\\input"));
- FileOutputFormat.setOutputPath(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\joinData\\output"));
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
4)测试
5)总结
缺点:这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。
解决方案:Map 端实现数据合并。
1)使用场景
Map Join 适用于一张表十分小、一张表很大的场景。
2)优点
思考:在 Reduce 端处理过多的表,非常容易产生数据倾斜。怎么办?
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用 DistributedCache
(1)在 Mapper 的 setup 阶段,将文件读取到缓存集合中。
(2)在 Driver 驱动类中加载缓存。
- //缓存普通文件到 Task 运行节点。
- job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
- //如果是集群运行,需要设置 HDFS 路径
- job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
1)需求
2)分析
MapJoin 适用于关联表中有小表的情形。
3)代码实现
(1)先在 MapJoinDriver 驱动类中添加缓存文件
- public class MapJoinDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
- // 1获取job信息
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- // 2设置加载jar包路径
- job.setJarByClass(MapJoinDriver.class);
- // 3关联mapper
- job.setMapperClass(MapJoinMapper.class);
- // 4设置Map输出KV类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(NullWritable.class);
- // 5设置最终输出KV类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- // 加载缓存数据
- job.addCacheFile(new URI("file:///L:/WorkSpaceTest/workspceForJava/MapReduceDemo/test/mapJoinData/input/tableCache/pd.txt"));
- // Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0
- job.setNumReduceTasks(0);
-
- FileInputFormat.setInputPaths(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\mapJoinData\\input"));
- FileOutputFormat.setOutputPath(job, new Path("L:\\WorkSpaceTest\\workspceForJava\\MapReduceDemo\\test\\mapJoinData\\output"));
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
(2)在 MapJoinMapper 类中的 setup 方法中读取缓存文件
- public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
- private final Map<String, String> pdMap = new HashMap<>();
-
- //任务开始前将 pd 数据缓存进 pdMap
- @Override
- protected void setup(Context context) throws IOException {
- //通过缓存文件得到小表数据 pd.txt
- URI[] cacheFiles = context.getCacheFiles();
- Path path = new Path(cacheFiles[0]);
- //获取文件系统对象,并开流
- FileSystem fs = FileSystem.get(context.getConfiguration());
- FSDataInputStream fis = fs.open(path);
- //通过包装流转换为 reader,方便按行读取
- BufferedReader reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
- //逐行读取,按行处理
- String line;
- while (StringUtils.isNotEmpty(line = reader.readLine())) {
- //切割一行
- //01 小米
- String[] split = line.split("\t");
- pdMap.put(split[0], split[1]);
- }
- //关流
- IOUtils.closeStream(reader);
- }
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- Text text = new Text();
-
- //读取大表数据
- //1001 01 1
- String[] fields = value.toString().split("\t");
- //通过大表每行数据的 pid,去 pdMap 里面取出 pname
- String pname = pdMap.get(fields[1]);
- //将大表每行数据的 pid 替换为 pname
- text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
- //写出
- context.write(text, NullWritable.get());
- }
- }
4)运行结果
“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。
ETL 一词较常用在数据仓库,但其对象并不限于数据仓库在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。
清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
1)需求
去除日志中字段个数小于等于 11 的日志。
2)需求分析
需要在 Map 阶段对输入的数据根据规则进行过滤清洗。
3)实现代码
(1)编写 WebLogMapper 类
- public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- // 1获取1行数据
- String line = value.toString();
- // 2解析日志
- boolean result = parseLog(line);
- // 3日志不合法退出
- if (!result) {
- return;
- }
- // 4日志合法就直接写出
- context.write(value, NullWritable.get());
- }
-
- // 2 封装解析日志的方法
- private boolean parseLog(String line) {
- // 1 截取
- String[] fields = line.split(" ");
- // 2 日志长度大于 11 的为合法
- return fields.length > 11;
- }
- }
(2)编写 WebLogDriver 类
- public class WebLogDriver {
- public static void main(String[] args) throws Exception {
- // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
- args = new String[]{"D:/input/inputlog", "D:/output1"};
- // 1 获取 job 信息
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- // 2 加载 jar 包
- job.setJarByClass(LogDriver.class);
- // 3 关联 map
- job.setMapperClass(WebLogMapper.class);
- // 4 设置最终输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
- // 设置 reducetask 个数为 0
- job.setNumReduceTasks(0);
- // 5 设置输入和输出路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- // 6 提交
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
1)输入数据接口:InputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为 value 返回。
(3)CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。
2)逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
3)Partitioner 分区
(1)有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
4)Comparable 排序
(1)当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接口,重写其中的 compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个 Reduce。
(4)二次排序:排序的条件有两个
5)Combiner 合并
Combiner 合并可以提高程序执行效率,减少 IO 传输。但是使用时必须不能影响原有的业务处理结果。
6)逻辑处理接口:
Reducer用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
7)输出数据接口:OutputFormat
(1)默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件输出一行。
(2)用户还可以自定义 OutputFormat。
1)压缩的好处和坏处
压缩的优点:以减少磁盘 IO、减少磁盘存储空间。
压缩的缺点:增加 CPU 开销。
2)压缩原则
(1)运算密集型的 Job,少用压缩
(2)IO 密集型的 Job,多用压缩
1)压缩算法对比介绍
压缩格式 | 是否Hadoop 自带 | 算法 | 文件扩展名 | 是否可切片 | 压缩后,原程序是否要修改 |
DEFLATE | 是 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 是 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
2)压缩性能的比较
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片
优点:压缩率比较高;
缺点:不支持 Split;压缩/解压速度一般;
优点:压缩率高;支持 Split;
缺点:压缩/解压速度慢。
优点:压缩/解压速度比较快;支持 Split;
缺点:压缩率一般;想支持切片需要额外创建索引
优点:压缩和解压缩速度快;
缺点:不支持 Split;压缩率一般;
压缩可以在 MapReduce 作用的任意阶段启用。
1)为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器
压缩格式 | 对应的编码/解码器 |
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
2)要在 Hadoop 中启用压缩,可以配置如下参数
参数 | 默认值 | 阶段 | 建议 |
io.compression.codecs (在 core-site.xml 中配置) | 无, 这个需要在命令行输入hadoop checknative 查看 | 输入压缩 | Hadoop 使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress (在 mapred-site.xml 中配置) | false | mapper 输出 | 这个参数设为 true 启用压缩 |
mapreduce.map.output.compress.codec (在 mapredsite.xml 中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper 输出 | 企业多使用 LZO 或Snappy 编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress (在mapred-site.xml 中配置) | false | reducer 输出 | 这个参数设为 true 启用压缩 |
mapreduce.output.fileoutputformat.compress.codec (在mapred-site.xml 中配置) | org.apache.hadoop.io.compress.DefaultCodec | reducer 输出 | 使用标准工具或者编解码器,如 gzip 和bzip2 |
即使 MapReduce 的输入输出文件都是未压缩的文件,仍然可以对 Map 任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到 Reduce 节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,来看下代码怎么设置。
基于 WordCount 案例处理。
1)Hadoop 源码支持的压缩格式有:BZip2Codec、DefaultCodec
- public class WordCountDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = new Configuration();
- // 开启 map 端输出压缩
- conf.setBoolean("mapreduce.map.output.compress", true);
- // 设置 map 端输出压缩方式
- conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
- Job job = Job.getInstance(conf);
- job.setJarByClass(WordCountDriver.class);
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean result = job.waitForCompletion(true);
- System.exit(result ? 0 : 1);
- }
- }
2)Mapper 保持不变
3)Reducer 保持不变
基于 WordCount 案例处理。
1)修改驱动
- public class WordCountReduceCompressDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- job.setJarByClass(WordCountDriver.class);
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- // 设置 reduce 端输出压缩开启
- FileOutputFormat.setCompressOutput(job, true);
- // 设置压缩的方式
- FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
- // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
- // FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);
-
- boolean result = job.waitForCompletion(true);
- System.exit(result ? 0 : 1);
- }
- }
2)Mapper 和 Reducer 保持不变
1)导包容易出错。尤其 Text 和 CombineTextInputFormat。
2)Mapper 中第一个输入的参数必须是 LongWritable 或者 NullWritable,不可以是 IntWritable。否则报的错误是类型转换异常。
3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),说明 Partition和 ReduceTask 个数没对上,调整 ReduceTask 个数。
4)如果分区数不是 1,但是 reducetask 为 1,是否执行分区过程。答案是:不执行分区过程。
因为在 MapTask 的源码中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于1 肯定不执行。
5)报类型转换异常。通常都是在驱动函数中设置 Map 输出和最终输出时编写错误。Map 输出的 key 如果没有排序,也会报类型转换异常。
6)集群中运行 wc.jar 时出现了无法获得输入文件。原因:WordCount 案例的输入文件不能放用 HDFS 集群的根目录。
7)自定义 Outputformat 时,注意在 RecordWirter 中的 close 方法必须关闭流资源。否则输出的文件内容中数据为空。
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- if (fos != null) {
- fos.close();
- }
- if (otherfos != null) {
- otherfos.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。