赞
踩
目录
14. Hadoop序列化和反序列化介绍及不使用Java序列化的原因?
磁盘IO
包括单机模式、伪分布式模式、完全分布式模式。
单机模式(standalone)
单机模式是Hadoop的默认模式。这种模式在一台单机上运行,没有分布式文件系统,而是直接读写本地操作系统的文件系统。当首次解压Hadoop的源码包时,Hadoop无法了解硬件安装环境,便保守地选择了最小配置。在这种默认模式下所有3个XML文件均为空。当配置文件为空时,Hadoop会完全运行在本地。因为不需要与其他节点交互,单机模式就不使用HDFS,也不加载任何Hadoop的守护进程。该模式主要用于开发调试MapReduce程序的应用逻辑。
伪分布模式(Pseudo-DistributedMode)
这种模式也是在一台单机上运行,但用不同的Java进程模仿分布式运行中的各类结点。伪分布模式在“单节点集群”上运行Hadoop,其中所有的守护进程都运行在同一台机器上。该模式在单机模式之上增加了代码调试功能,允许你检查内存使用情况,HDFS输入输出,以及其他的守护进程交互。
完全分布模式(Fully DistributedMode)
Hadoop守护进程运行在一个集群上。至少三台机器。
Hadoop 1只有两个部分,HDFS和MapReduce,Hadoop 2增加了Yarn用来管理资源调度。具体来说:
HDFS组成:
(1)Namenode:存储文件的元数据以及每个文件的块列表和块所在的DataNode等。
(2)DataNode:在本地文件系统存储文件块数据以及块数据的校验和。
(3)Secondary NameNode:监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。
Yarn组成:
(1)ResorceManager:处理客户端请求,监控NodeManager,启动或监控ApplicationMaster
(2)NodeManager:管理单个节点的资源,处理来自ResourceManager和ApplicationMaster的命令
(3)ApplicationMaster:负责数据的切分,为应用程序申请资源并分配给内部任务,任务的监控与容错
(4)Container:是资源抽象,封装了某个节点的多维度资源,如内存、CPU、磁盘、网络等。
可以在hdfs-site.xml中进行修改。
1. 每个block保存份数: dfs.replication
默认是3份,在客户端上设定,通常也需要在DataNode上设定。
2. 每个block的大小: dfs.block.size
从2.7.3版本开始block size的默认大小为128M,之前版本的默认值是64M。对于新文件切分的大小,单位byte。每一个节点都要指定,包括客户端。
原理:文件块越大,寻址时间越短,但磁盘传输时间越长;文件块越小,寻址时间越长,但磁盘传输时间越短。
如果块设置过大,一方面从磁盘传输数据的时间会明显大于寻址时间,导致程序在处理这块数据时,变得非常慢;另一方面,MapReduce中的map任务通常一次只处理一个块中的数据,如果块过大运行速度也会很慢。
如果设置过小,一方面存放大量小文件会占用NameNode中大量内存来存储元数据,而NameNode的内存是有限的,不可取;另一方面块过小,寻址时间增长,导致程序一直在找block的开始位置。因此,块适当设置大一些,减少寻址时间,那么传输一个有多个块组成的文件的时间主要取决于磁盘的传输速度。
具体分析:1)HDFS中平均寻址时间大概为10ms;2)经过前任的大量测试发现,寻址时间为传输时间的1%时,为最佳状态,所以最佳传输时间为:10ms/0.01=1000s=1s;3)目前磁盘的传输速度普遍为100MB/s,最佳block大小计算:100MB/s*1s=100MB,一般设置为2的整数次幂,所以我们设置block大小为128MB。
HDFS写数据过程
1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
2)NameNode返回是否可以上传。
3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1、dn2、dn3逐级应答客户端。
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
HDFS读数据过程
1) 客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2) 挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
3) DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位来做校验)。
4) 客户端以packet为单位接收,先在本地缓存,然后写入目标文件。
在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。那么这个最近距离怎么计算呢?节点距离:两个节点到达最近的共同祖先的距离总和。
机架感知:第一个副本在Client所处的节点上,如果客户端在集群外,随机选一个。第二个副本和第一个副本位于相同机架,随机节点。第三个副本位于不同机架,随机节点。
1. 第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
2. 第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到SecondaryNameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
1. NameNode启动
NameNode启动时,首先将镜像文件(Fsimage)载入内存,并执行编辑日志(Edits)中的各项操作。一旦在内存中成功建立文件系统元数据的映像,则创建一个新的Fsimage文件和一个空的编辑日志。此时,NameNode开始监听DataNode请求。这个过程期间,NameNode一直运行在安全模式,即NameNode的文件系统对于客户端来说是只读的。
2. DataNode启动
系统中的数据块的位置并不是由NameNode维护的,而是以块列表的形式存储在DataNode中。在系统的正常操作期间,NameNode会在内存中保存所有块位置的映射信息。在安全模式下,各个DataNode会问NameNode发送最新的块列表信息,NameNode了解到足够多的块位置信息之后,即可高效运行文件系统。
3. 安全模式退出判断
如果满足“最小副本条件”,NameNode会在30秒钟之后就退出安全模式。所谓的最小副本条件指的是在整个文件系统中99.9%的块满足最小副本级别(默认值: dfs.replication.min= l)。在启动一个刚刚格式化的HDFS集群时,因为系统中还没有任何块。所以NameNode不会进入安全模式。
工作机制:
1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。
3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
4)集群运行中可以安全加入和退出一些机器。
1) 元数据管理方式需要改变
内存中各自保存一份元数据;
Edits日志只有Active状态的NameNode节点可以做写操作;
两个NameNode都可以读取Edits;
共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现);
2)需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。
3)必须保证两个NameNode之间能够ssh无密码登录
4)隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务
自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController (ZKFC)进程。ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:
1)故障检测:集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
2)现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。
ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:
1)健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
2)ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
3)基于ZooKeeper的选择:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为Active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为Active状态。
1. 脑裂问题:脑裂(split-brain),指在一个高可用(HA)系统中,当联系着的两个节点断开联系时,出现两个Active的NameNode。
2. 采用fence机制进行预防
(1)共享存储fencing:确保只有一个NameNode往共享存储中写数据。
(2)客户端fencing:确保只有一个NameNode可以响应客户端的请求。
(3)DataNode fencing:确保只有一个NameNode可以向DataNode下发命令。
3. 防止脑裂的两种方式:ssh发送kill指令,调用用户自定义脚本程序。
在HA运行期间,会有多个ResourceManager并存,并且其中只有一个ResourceManager处于Active(活动)状态,另外一些(允许一个或者多个)则处于Standby(备用)状态,当Active(活动)节点无法正常工作时,其余处于Standby(备用)状态的节点则会通过竞争选举产生新的Active节点。
ResourceManager使用基于Zookeeper实现的ActiveStandbyElector组件来确定ResourceManager的状态。
1.创建锁节点。在Zookeeper上会有一个类似于/yarn-leader-election/pseudo-yarn-rm-cluster的锁节点,所有的ResourceManager在启动时,都会去竞争写一个Lock子节点(/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock),子节点类型为临时节点,利用Zookeeper的特性,创建成功的那个ResourceManager切换为Active状态,其余的为Standby状态。
2.注册Watcher监听。所有Standby状态的ResourceManager都会向/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock节点注册一个节点变更监听,利用临时节点的特性,能够快速感知到Active状态的ResourceManager的运行情况。
3.主备切换。当Active的ResourceManager无法正常工作时,其创建的Lock节点也会被删除,此时,其余各个Standby的ResourceManager都会收到通知,然后重复步骤1。
yarn中RM高可用的脑裂问题:当RM由于网络原因为对外做出响应,发生“假死”,但是该RM仍然认为自己是active的,但是zookeeper已经触发了新一轮的主备切换,造成多个active的RM存在。
解决办法:使用ACL隔离:当主备求换RM竞争创建子节点时,携带zookeeper的ACL权限进行限制,“假死”状态的RM恢复之后,尝试更新zookeeper状态时发现ACL出现改变,会自动将自身更新成standby状态。
(1)序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
(2)反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
(3)Java的序列化是一个重量级序列化框架(Serializable或者ExternalSerializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
1. 介绍
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner不能影响业务逻辑。
2. 调用时机
(1)map端溢写,进行分区排序后
(2)多个小文件归并时
(3)Reduce复制map输出,合并溢写到磁盘时
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt);
b)计算切片大小
computeSliteSize( Math.max( minSize, Math.min( maxSize, blocksize))) = blocksize=128M
c)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
(1)TextInputFormat
默认方式。key是LongWritable,value是一行text。
(2)KeyValueTextInputFormat
每一行是k-v对,都是Text,默认是制表符\t分开
(3)NLineInputFormat
每次达到某个行数N,就进行分片。
(4)SequenceFileInputFormat
顺序文件格式,二进制的键值对,用于两个MapReduce任务之间的输入输出。
(5)CombineTextInputFormat
切片机制与FileInputFormat不同。因为FileInputFormat是对每个文件进行独立切片,不管文件多小每个文件都会单独产生1个或者多个切片. 而CombineTextInputFormat可以将多个小文件从逻辑上规划到一个切片中,这样就可以只交给一个MapTask处理。CombineTextInputFormat适合对多个小文件的处理。
1)Mapper数量
Mapper的数量由切分的个数决定,splitSize=max{minSize,min{maxSize,blockSize}},想大于blockSize则调节minSize,否则调节maxSize。
2)Reducer数量
reduce的数量自己设定,可以为0,不设置的话默认为 1。
主要有三种,reduce join、map join和semi join。
1. reduce join
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list,然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
2. map join
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
3. semi join
Semi Join,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
1. 全排序三种方式:
①设置一个分区,只用一个Reducer
②自定义分区函数的分界点,使得分区间和分区内都有序,分区点不易确定
③基于数据采样的排序:
1)对待排序数据进行抽样;
2)对抽样数据进行排序,产生标尺;
3)Map对输入的每条数据计算其处于哪两个标尺之间;将数据发给对应区间ID的reduce;
4)Reduce将获得数据直接输出。
2. 辅助排序(GroupingComparator分组)
有些时候reduce需要不只是需要键的排序,也依赖于值的排序,这时候称为辅助排序。具体做法是在map中将两个键组合成新键发到reduce,reduce再自定义分组函数,并设置GroupingComparator。
分区指的是将按照Mapper中的key将结果分到不同的Reducer中,通过Partitioner设置,默认是HashPartitioner。
分组是在同一个Reducer中具有key的值是同一个分组的,也就是同一轮迭代器中,用GroupingComparator来设置。
1. 可扩展性
利用资源管理器和applicatoin master分离的架构优点分离管理资源和处理job的功能;job tracker则同时负责这两项,还要存储已完成作业历史,更包含了timeline server的功能,不利于扩展。
2. 可用性
jobtracker多功能导致复杂的内存状态,难以实现高可用;YARN分而治之,先实现RM的高可用(多个RM),再实现application master的高可用(其他NM上运行相同job)。
3. 利用率
每个tasktracker有若干固定长度的slot,可能过大或者过小;每个容器维护一个资源池,按需请求资源。
4. 多租户
YARN的通用性向除了MapReduce之外其他应用开放了Hadoop,如Spark,Storm等。
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
1.1. FIFO Scheduler(先进先出调度器)
FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。
1.2.Capacity Scheduler(容量调度器)
yarn-site.xml中默认配置的资源调度器。而对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。用这个资源调度器,就可以配置yarn资源队列。
1.3. Fair Scheduler(公平调度器)
Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。在上面的“Yarn调度器对比图”展示了一个队列中两个应用的公平调度;当然,公平调度在也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
a) 公平调度器,就是能够共享整个集群的资源
b) 不用预先占用资源,每一个作业都是共享的
c) 每当提交一个作业的时候,就会占用整个资源。如果再提交一个作业,那么第一个作业就会分给第二个作业一部分资源,第一个作业也就释放一部分资源。再提交其他的作业时,也同理。。。。也就是说每一个作业进来,都有机会获取资源。
1.4. Fair Scheduler与Capacity Scheduler区别
资源公平共享:在每个队列中,FairScheduler可选择按照FIFO、Fair或DRF策略为应用程序分配资源。Fair策略即平均分配,默认情况下,每个队列采用该方式分配资源
支持资源抢占:当某个队列中有剩余资源时,调度器会将这些资源共享给其他队列,而当该队列中有新的应用程序提交时,调度器要为它回收资源。为了尽可能降低不必要的计算浪费,调度器采用了先等待再强制回收的策略,即如果等待一段时间后尚有未归还的资源,则会进行资源抢占;从那些超额使用资源的队列中杀死一部分任务,进而释放资源
负载均衡:Fair Scheduler提供了一个基于任务数的负载均衡机制,该机制尽可能将系统中的任务均匀分配到各个节点上。此外,用户也可以根据自己的需求设计负载均衡机制
调度策略灵活配置:Fiar Scheduler允许管理员为每个队列单独设置调度策略(当前支持FIFO、Fair或DRF三种)
提高小应用程序响应时间:由于采用了最大最小公平算法,小作业可以快速获取资源并运行完成。
Hadoop中常用的压缩算法有bzip2、gzip、lzo、snappy,其中lzo、snappy需要操作系统安装native库才可以支持。
四种压缩方式:
1)Gzip压缩
优点:压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带gzip命令,使用方便。
缺点:不支持split。
应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式。例如说一天或者一个小时的日志压缩成一个gzip文件,运行MapReduce程序的时候通过多个gzip文件达到并发。
2)Bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;Hadoop本身支持,但不支持native;在Linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序的情况。
3)Lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;Hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。
4)Snappy压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持split;压缩率比gzip要低;Hadoop本身不支持,需要安装;
应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入。
Mapreduce 程序效率的瓶颈在于两点:
1. 计算机性能
CPU、内存、磁盘健康、网络
2. I/O 操作优化
(1)数据倾斜
(2)map和reduce数设置不合理
(3)reduce等待过久
(4)小文件过多
(5)大量的不可分块的超大文件
(6)spill次数过多
(7)merge次数过多等。
1. 数据输入
(1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致 mr 运行较慢。
(2)采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。
2. map阶段
(1)减少spill次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘 IO。
(2)减少merge次数:通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。
(3)在 map 之后先进行combine处理,减少 I/O。
3. reduce阶段
(1)合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。
(2)设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。
(3)规避使用reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)合理设置reduce端的buffer,默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。
4. IO传输
(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。
(2)使用SequenceFile二进制文件
5. 数据倾斜问题
(1)数据倾斜现象
数据频率倾斜——某一个区域的数据量要远远大于其他区域。
数据大小倾斜——部分记录的大小远远大于平均值。
(2)减少数据倾斜的方法
方法1:抽样和范围分区
可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
方法2:自定义分区
另一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。
方法3:Combine
使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。
方法4:采用Map Join,尽量避免Reduce Join
1. HDFS小文件弊端:
HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大是的索引速度变慢。
2. 解决的方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
3. Hadoop自带小文件解决方案
(1)Hadoop Archive:
是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时。
(2)Sequence file:
sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。
(3)CombineFileInputFormat:
CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。
(4)开启JVM重用
对于大量小文件Job,可以开启JVM重用会减少45%运行时间。
JVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map。
具体设置:mapreduce.job.jvm.numtasks值在10-20之间。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。