当前位置:   article > 正文

hadoop-MapReduce/Yarn_java 分布式计算框架

java 分布式计算框架

分布式计算模型MapReduce

1.MapReduce设计思想

2.MapReduce分布式计算的基本原理

3.使用Java进行MapReduce编程

4.在Hadoop集群中提交MapReduce任务

5.Yarn工作机制

1. MapReduce设计思想

1.1  什么是MapReduce

1)MapReduce是一个分布式计算框架

它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务。

起源于Google

2)适用于大规模数据处理场景

每个节点处理存储在该节点的数据

3)每个job包含Map(分类kv)和Reduce(计算)两部分

1.2   MapReduce的设计思想

1)分而治之:简化并行计算的编程模型

2)构建抽象模型:Map和Reduce

开发人员专注于实现Mapper和Reducer函数

3)隐藏系统层细节:开发人员专注于业务逻辑实现

1.3   MapReduce特点

1)优点:易于编程;可扩展性;高容错性;高吞吐量

2)不适用领域:难以实时计算;不适合流式计算;不适合DGA(有向图)计算

1.4  MapReduce编程规范

1)MapReduce框架处理的数据格式是<K,V>键值对形式

Mapper  

Map端接收<K,V>键值对数据,经过处理输出新的<K,V>键值对

Map端处理逻辑写在Mapper类中map()方法中

2)Reducer

Reduce端搜集多个Mapper端输出的<K,V>数据,进行汇总

Reducer的业务逻辑写在reduce()方法中

每一组相同k的<k,Iterator<v>>组调用一次reduce()方法

2. 使用Java进行MapReduce编程

3.1  WordCount功能实现

(1)物料准备:wordcount

(2)WordCountMapper

(3)WordCountReduce

(4)WordCountDriver

执行后输出:

3. Hadoop集群中提交MapReduce任务

Idea打包工程成jar包,执行命令

4. MapReduce分布式计算的基本原理

4.1  Hadoop序列化

什么是序列化:序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

(1)必须可序列化(serializable)

作用:网络传输以及持久化存储

IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable, BooleanWritable、NullWritable等

(2)都继承了Writable接口

并实现write()和readFields()方法

(3)Keys必须实现WritableComparable接口

MapReduce框架会按照Key进行排序

Reduce阶段需要sort

keys需要可比较

4.2  MapReduce框架原理

MapReduce执行流程:

(1)split阶段:计算分片

(2)map阶段:调用map()方法对数据进行处理

(3)shffule阶段:主要负责将map端生成的数据传递给reduce端

(4)reduce阶段:对Shffule阶段传来的数据进行最后的整理合并

4.3  MapTask

1Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value

2Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value

3Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

4Spill阶段:即溢写,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

        溢写阶段详情:

        步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

        步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.outN表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

        步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

5Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

        当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index

        在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

        让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

4.4  ReduceTask

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。

4.5  InputFormat数据输入接口

切片与MapTask并行度决定机制

1)问题引出

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

思考:1G的数据,启动8MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

2)MapTask并行度决定机制

数据块:BlockHDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

4.6  InputSplit(输入分片)

4.7  Shuffle阶段

数据从Map输出到Reduce输入的过程

4.8  Combiner

(1)Combiner相当于本地化的Reduce操作

在shuffle之前进行本地聚合

用于性能优化,可选项

输入和输出类型一致

(2)Reducer可以被用作Combiner的条件

符合交换律和结合律

(3)实现Combiner

job.setCombinerClass(WCReducer.class)

4.9  Partitioner

(1)用于在Map端对key进行分区

默认使用的是HashPartitioner

获取key的哈希值

使用key的哈希值对Reduce任务数求模

决定每条记录应该送到哪个Reducer处理

(2)自定义Partitioner

继承抽象类Partitioner,重写getPartition方法

job.setPartitionerClass(MyPartitioner.class)

5. MapReduce实现 SQL Join操作

4.1  map端join

1)使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2)优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3)具体办法:采用DistributedCache

       (1)在Mapper的setup阶段,将文件读取到缓存集合中。

       (2)在Driver驱动类中加载缓存。

//缓存普通文件到Task运行节点。

job.addCacheFile(new URI("file:///e:/cache/pd.txt"));

//如果是集群运行,需要设置HDFS路径

job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

4.2  reduce端join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

    Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

6. Yarn

Yarn工作机制

1MR程序提交到客户端所在的节点。

2YarnRunnerResourceManager申请一个Application

3RM将该应用程序的资源路径返回给YarnRunner

4)该程序将运行所需资源提交到HDFS上。

5)程序资源提交完毕后,申请运行mrAppMaster

6RM将用户的请求初始化成一个Task

7)其中一个NodeManager领取到Task任务。

8)该NodeManager创建容器Container,并产生MRAppmaster

9ContainerHDFS上拷贝资源到本地。

10MRAppmasterRM 申请运行MapTask资源。

11RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

12MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTaskMapTask对数据分区排序。

13MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

14ReduceTaskMapTask获取相应分区的数据。

15)程序运行完毕后,MR会向RM申请注销自己。

HDFS、YARN、MapReduce三者关系

作业提交过程之HDFS & MapReduce

作业提交全过程详解

1)作业提交

1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。

2步:ClientRM申请一个作业id

3步:RMClient返回该job资源的提交路径和作业id

4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。

5步:Client提交完资源后,向RM申请运行MrAppMaster

2)作业初始化

6步:当RM收到Client的请求后,将该job添加到容量调度器中。

7步:某一个空闲的NM领取到该Job

8步:该NM创建Container并产生MRAppmaster

9步:下载Client提交的资源到本地。

3)任务分配

10步:MrAppMasterRM申请运行多个MapTask任务资源。

11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

4)任务运行

12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTaskMapTask对数据分区排序。

13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

14步:ReduceTaskMapTask获取相应分区的数据。

15步:程序运行完毕后,MR会向RM申请注销自己。

5)进度和状态更新

YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。

6)作业完成

除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/893042
推荐阅读
相关标签
  

闽ICP备14008679号