当前位置:   article > 正文

hadoop各组件工作流程分析

hadoop各组件工作流程分析

HDFS YARN MapReduce三者关系

关系图

在这里插入图片描述

文字描述

  1. 客户端提交文件给Hadoop -> Yarn -> ResourceManager
  2. ResourceManager启动App Master
  3. App Master 申请资源给Container
  4. 在Container中跑MapTask(数据分成了几片就有几个Map Task)
  5. Map Task 中的数据从DataNode中来
  6. Map Task 跑完把数据结果传给Reducer Task
  7. Reducer Task跑完把结果存入HDFS

HDFS写数据流程

流程图

HDFS写数据流程

过程描述

  1. 客户端创建一个 分布式文件系统对象 通过它向NameNode请求上传文件D:\io\input\a.txt
  2. NameNode检查目录树是否可以上传(权限、目录是否已经存在等),响应可以上传
  3. 客户端向NameNode请求上传Block(0-128M)
  4. NameNode返回dn1 dn2 dn3节点 (副本节点选择:本地节点 其他机架一个节点 其他机架另一个节点 选距离最近的 拓扑)
  5. 客户端和DataNode1建立传输通道,然后DataNode1和DataNode2建立传输通道,DataNode2和DataNode3建立传输通道,应答成功就可以开始传输数据了
  6. 从本地读取数据后开始传输,一个包一个包的传。Packet 64k 含 chunk512byte + chunksum4byte
  7. DataNode节点会一个一个的把Packet写在磁盘上(落盘)
  8. 传输完成后若还有块er没传完,回到第三步直到所以块都传输完成

HDFS读数据流程

流程图

读数据流程图

过程描述

  1. 客户端创建文件系统对象 Distributed FileSystem 向NameNode发送读数据请求
  2. NameNode看客户端有没有读权限并看看自己有没有该数据,返回目标文件的 元数据
  3. 客户端创建流 读数据流
  4. 向目标DataNode节点发送读数据请求(块信息等)
  5. 目标节点向客户端传输数据
  6. 传输完成若还有块等待读取数据就重复步骤4直到数据都读完
  7. 将文件改名后关闭流

切片源码

/*
 getFormatMinSplitSize() : 返回1
 
 getMinSplitSize(job) :如果配置了mapreduce.input.fileinputformat.split.minsize参数返回参数设置的值
	否则返回默认值1
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

/*
如果设置了mapreduce.input.fileinputformat.split.maxsize参数那么返回该参数设置的值
	否则返回Long.MAX_VALUE
*/
long maxSize = getMaxSplitSize(job);


//该集合用来存放切片信息(一个切片就是一个对象)
//放入到该集合的对象是FileSplit 但是泛型是InputSplit 说明FileSplit和InputSplit有继承关系
//FileSplit是InputSplit的子类
List<InputSplit> splits = new ArrayList<InputSplit>();

//获取输入目录中所有的文件或目录状态
List<FileStatus> files = listStatus(job);

//获取文件的路径
Path path = file.getPath();

//获取文件的大小
long length = file.getLen();

//块大小
long blockSize = file.getBlockSize();

/*
  切片大小
  
  默认 : 片大小 = 块大小
  需求:
	  片大小 > 块大小 :修改minSize大小
	  片大小 < 块大小 :修改maxSize大小
  
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }
*/          
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

//剩余文件大小
long bytesRemaining = length;

/*
 开始切片 : 
	((double) bytesRemaining)/splitSize > SPLIT_SLOP : 剩余文件大小 / 切片大小 > 1.1  (为了防止最后1片太小)
*/
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
			
			/*
				makeSplit : 创建FileSplit对象
				参数length-bytesRemaining :片的起始位置
				参数splitSize :切片大小
				
				将FileSplit对象放入到splits集合中
			*/
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
			//重新计算剩余文件大小 :将片大小从剩余文件大小减掉
            bytesRemaining -= splitSize;
}

//将剩余文件大小整体切一片
if (bytesRemaining != 0) {
	int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
	splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
			   blkLocations[blkIndex].getHosts(),
			   blkLocations[blkIndex].getCachedHosts()));
}


 not splitable
//如果文件不可切整个文件是1片
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
		  blkLocations[0].getCachedHosts()));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

InputFormat

InputFormat : 抽象类
	作用 :①切片 ②读取数据
	
	//切片
	public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;

   //创建RecordReader对象 该对象用来读取数据
   public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                InterruptedException;InputFormat的继承树
	|-----InputFormat 抽象类
		|------FileInputFormat 抽象类
			|-------TextInputFormat 默认使用的InputFormat的类
			
			
三 FileInputFormat 抽象类
	1.FileInputFormat是抽象类 继承了 InputFormat 
	2.FileInputFormat实现了父类的getSplits方法
	
四 TextInputFormat 默认使用的InputFormat的类
	1.TextInputFormat是默认使用的InputFormat的类 继承了FileInputFormat
	2.TextInputFormat实现了createRecordReader方法
	3.createRecordReader方法返回了LineRecordReaderLineRecordReaderRecordReader的子类
	4.LineRecordReader是真正用来读取文件中的数据的那个对象的所属类
	
	public RecordReader<LongWritable, Text> 
		createRecordReader(InputSplit split,
						   TaskAttemptContext context) {
		return new LineRecordReader(recordDelimiterBytes);
   }	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

conbineTextInputFormat切片机制
将大量的小文件合并成一个大的Map Task的过程
虚拟存储过程 切片过程

MapReduce工作流程

概念图

maperduce详细工作流程

过程描述

  1. 准备待处理数据a.txt 200M 块大小是128M 切两片(若想改变切片大小就设置conf参数,具体修改见切片源码)
  2. InputFormat -> FileInputFormat -> TextInputFormat -> getSplit 对数据进行切片处理,上传jar包,提交job信息给YarnRunner 创建App Master
  3. App Master 申请资源,生成两个Map Task
  4. Map Task 用 InputFormat -> RecordReader -> LineRecondReader 读数据
  5. 然后使用Mapper的map方法进行数据处理
  6. 接下来就是shuffle过程,在下一章节详细讲解

shuffle机制

shuffle流程图解

黑夜

文字描述

  1. map方法写出的数据流向了缓冲区
    • 该缓冲区在数据量达到80%时会进行排序,排序完成的数据会形成多个分区直接写入磁盘,每轮数据都会按该分区算法进行分区,方便下一步归并
    • 所有数据都处理完成后就会进行归并排序,相同分区的数据进行归并
    • 归并完成后按分区进行combiner合并压缩即:将1万条aaa 1 -> aaa 10000
    • 将处理结果写入磁盘
  2. 进行过上述操作后将所有 Map Task 的相同分区放在一起进行的归并排序
  3. 最后数据进入 reduce 方法

Yarn工作机制

流程图

在这里插入图片描述

Map Task工作机制

流程图

在这里插入图片描述

Reduce Task 工作机制

流程图

在这里插入图片描述

MapReduce工作机制

流程图

在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/592686
推荐阅读
相关标签
  

闽ICP备14008679号