赞
踩
Hadoop种常问的就三块:
第一:分布式存储(HDFS)
第二:分布式计算框架(MapReduce)
第三:资源调度框架(YARN)
**hadoop1组成:**MapReduce(计算+资源调度),HDFS(数据存储),Common(辅助工具)
**hadoop2组成:**MapReduce(计算),Yarm(资源调度),HDFS(数据存储),Common(辅助工具)
在hadoop1时代,hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合度较大
在hadoop2时代,增加了Yarm。Yarm只负责资源的调度,MapRedecue只负责运算。另外,hadoopHA加入了对zookeeper的支持实现比较可靠的高可用
hadoop3可以理解为hadoop的生态圈
HDFS是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由多个服务器联合起来实现其功能,集群中的服务器有各自的角色。
HDFS的使用场景:适合一次写入,多次读出的场景。一个文件经过创建,写入和关闭之后就不需要改变。
**HDFS优点 **
1)高容错性
1.数据自动保存多个副本,通过增加副本的形式,提高容错性
2.某一个副本丢失后,它可以自动恢复
2)适合处理大数据
1.能够处理的数据规模达到GB,TB,PB
2.能够处理百万规模以上的文件数量
3)可构建在廉价服务器上,通过多副本机制,提高可靠性
HDFS缺点
1)不适合低延时数据访问。理解为即时数据访问
2)无法高效的对大量小文件进行存储
1.存储大量小文件的话,会占用NameNode大量的内存来存储文件目录和块信息。但其内存式有限的
2.小文件存储的寻址时间会超过读取时间,违反了HDFS的设计目标
3.HDFS的block大小可以设置,默认为126M.
3)不支持并发写入、文件随机修改
1.一个文件只能由一个写,不允许多个线程同时写
2.仅支持数据append(追加),不支持文件的随机修改
共有3中运行模式:
本地模式:单机运行,用来演示,生产环境不可用
伪分布式模式:也是单机模式,但是具备hadoop集群的所有功能。生产环境不可用
完全分布式:多台服务器组成分布式环境,生产环境使用
1)zookeeper:一个开源的分布式协调服务框架,为分布式系统提高一致性服务,基于zookeeper可以实现数据同步,统计配置,命名服务。
2)Flume:一个分布式的海量日志采用,聚合和传输的系统
3)Hbase:一个分布式的,面向列的开源数据库,利用HDFS作为其存储系统
4)Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行
5)Sqoop:将一个关系型数据库中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导入到关系型数据库中
Hadoop是指Hadoop框架
Hadoop生态系统:不仅包含hadoop,还包括保证hadoop正常高效运行的其他框架
(一)NameNode(nn):就是Master。负责管理整个文件系统的元数据,所有的文件路径和数据块的存储信息都保存在nn中。NN处理客户端读写请求,分发给DN去执行
(二)DataNode(DN):就是Slave,存储实际的数据块,执行真正的读写操作
(三)Client:就是客户端
1.文件切分。文件上传HDFS的时候,Client将文件切分为一个一个的Block,进行上传
2.与NN交互,获取文件的位置信息。与DN交互,读取或者写入数据
3.Client提供一些命令来管理HDFS,也可以通过一些命令访问HDFS
(四)SecondaryNameNode:并非NN的热备。其定期触发CheckPoint(服务),代替NN合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN的启动时间,
HDFS中的文件在物理上是分块存储的,块大小可以通过配置参数规定
如果一个文件小于规定大小,其只能占文件本身大小的空间
在寻址流程中: 寻址时间为传输时间的1%时,为最佳状态
为什么块的设置不能太大,不能太小?
HDFS块设置太小,会增加寻址时间。块小,文件大,则存储的块多,寻址时间多
HDFS块设置太大,从磁盘传输数据的时间明显大于寻址的时间
**总结:**HDFS块的大小设置主要取决于磁盘传输速率。一般设置为128M.固态硬盘传输速率块,设置为256M.
1.client向NN请求上传文件,NN检查其权限,判断文件是否已经存在
2.如果权限许可,目标文件也存在,NN进行响应,返回是否可以上传文件
3.客户端请求第一个Block上传到哪几个DN上
4.NN返回3个DN节点(默认为3)
5.客户端创建数据流对象,通过该对象请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3。这样dn1~dn3通信通道建立完成
6.传输通道建立完成后,dn1,dn2,dn3逐级应答客户端
7.客户端开始往dn1上传第一个Block,以Packet为单位,dn1收到Packet传给dn2,dn2传给dn3
8.当一个block传输完成之后,客户端请求NN来上传第二个Block到服务器
9.当所有的Block传输完毕后并确认后,客户端关闭数据流对象。然后联系NN,确认数据写完成。NN在内存中对元数据进行增删改。
默认情况下,HDFS中的数据块有3个副本。副本存储策略如下(看源码):
第一个副本在Client所处的节点上。如果客户端在集群外,随机选一个。
第二个副本在另一个机架的随机一个节点
第三个副本在第二个副本所在机架的随机节点
第四个及以上,随机选择副本存放位置。
这么选择的好处:
第一个副本在本地,考虑到结点距离最近,上传速度最快
第二个副本在另一个机架的节点上,考虑到数据的可靠性
第三个副本在第二个副本所在机架的节点,又要兼顾对应的效率
1.client向NN请求下载文件
2.挑选一台DN服务器,请求读取数据
3.DN开始传输数据给客户端,串行读取,即先读取第一个块,在读取第二个块拼接到上一个块后面
4.客户端以Packet为单位接收,现在本地缓存,然后写入目标文件
(一)第一阶段:NameNode 启动
(1)第一次启动 NameNode 格式化后,创建 镜像文件fsimage 和 编辑日志edits_inprogress_001 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode 记录更新操作到edits_inprogress_001(编辑日志) 中
(4)NameNode 在内存中对元数据进行增删改(然后再通过SecondaryNameNode对元数据进行修改)。
(二)第二阶段:Secondary NameNode 工作
(1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint(即是否需要服务),带回 NameNode是否可服务的条件。CheckPoint触发条件:定时时间到;Edits中的数据满了。
(2)Secondary NameNode 请求执行 CheckPoint(即请求服务)。
(3)NameNode 滚动正在写的 edits_inprogress_001 日志,将其命名为edits_001,并生产新的日志文件edits_inprogress_002,以后再有客户端操作,日志将记录到edits_inprogress_002中。
(4)将编辑日志edits_001和镜像文件fsimage拷贝到 SecondaryNameNode。
(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件 fsimage.chkpoint。
(7)拷贝 fsimage.chkpoint 到 NameNode。
(8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage(此时的fsimage是最新的)。
NN存储的是edits_inprogress,2NN存储的是edits。
(1)FsImage文件包含文件系统中所有目录和文件inode的序列化形式。
FsImage文件没有记录块存储在哪个数据节点。而是由NameNode把这些映射保留在内存中
(2)Edits文件:存放HDFS文件系统的所有更新的操作,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。
(3)seen_txid文件保存的是一个数字,就是最后一个edits_
的数字
思考:NameNode 如何确定下次开机启动的时候合并哪些 Edits?
可以根据seen_txid文件保存的是一个数字,seen_txid保存的就是最后一个edits_
的数字,是最新的edits。
区别:
(1)NameNode负责管理整个文件系统的元数据,所有的文件路径和数据块的存储信息都保存在NameNode,除此之外,NameNode处理客户端读写请求,分发给DataNode去执行。
(2)SecondaryNameNode:并非NN的热备。其定期触发CheckPoint(服务),代替NN合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN的启动时间,
联系:
(1)SecondaryNameNode中保存了一份和namenode一致的历史镜像文件和历史编辑日志。但是NN还有一份正在使用的编辑欸之,这是2NN.
(2)在NN发生故障时,可以从2NN恢复历史的数据
SecondaryNameNode定期出发CheckPoint,代表NN合并编辑日期和镜像文件,从而减少编辑日志的大小,减少NN的启动时间。同时在合并期间,NN也可以对外提供写操作
(1)第一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据的校验信息包括数据块的长度,块数据的校验和,以及时间戳
(2)DN启动后向NN注册,注册成功之后周期性(默认6小时)向NN上报所有的块信息。同时,DN扫描自己节点块信息队列表的时间,检查DN中的块是否完好,如果块磁盘损坏,就将该块磁盘上存储的所有BlockID报告NN
(3)心跳每3秒一次。心跳返回结果带有NN给该DN的命令如:复制块数据,删除某个数据块等。如果超过10分钟+30s没有收到某个DN的心跳,则认为该节点不可用
HDFS主要目的是保证存储上数据完整性,对于各组件的失效,做了可靠性处理
(1)数据存储故容错
存储的数据错乱。HDFS的对应措施是,对于存储在DN上的数据块,计算并存储校验和(CheckSum))。在读取数据的时候,重新计算读取出来的数据的校验和,如果检验不正确就抛出异常,应用程序捕获异常后就到其他DN上读取备份数据
(2)磁盘故障容错
DN中的某块磁盘损坏。HDFS将该块磁盘上所有的BlockID报告给NN,NN检查这些数据在哪些DN上有备份,通知相应的DN服务器将数据块复制到其他服务器上,以保证数据块的备份满足要求。
(3)DataNode故障容错
DN通过心跳和NN保持通信,如果超时市场,则NN认为该节点不可用。则查找DN上的数据块有哪些,以及其数据块还存储在哪些服务器上,随后通知服务器再复制一份数据块到其他服务器上。保证HDFS存储的数据块备份数符合用户设置的数据。
(4)NameNode故障容错
NN负责管理整个文件系统的元数据,如果NN故障,整个HDFS系统集群都无法使用,如果NN上记录的数据丢失,整个集群所有DN存储的数据也就没有了。所以NN高可用容错能力非常重要。NN可以采用HDFS NN的高可用机制。
总结:正是因为HDFS的这些策略,才保证存储数据完整性,为运行于Hadoop之上的应用,提供稳固的支持。
**安全模式:**文件系统只接受读数据请求,而不接受删除、修改等变更操作
进入安全模式:
NameNode在启动时加载镜像文件和编辑日志期间处于安全模式
NameNode在接收DataNode注册时,处于安全模式
退出安全模式有三个条件
条件一:集群上最小可用的datanode 数量大于0
条件二:系统中99.99%的数据块都可用了,即系统中只允许丢一个块
条件三:集群在启动过后得过了30s之后才能退出安全模式
MapReduce 是一个分布式运算程序的编程框架,它的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
优点:
1)MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序, 这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
4)适合 TB/PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点:
1)不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后, 每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
一个完整的MapReduce程序在分布式运行时有三类实例进程:
1.MrAppMaster:负责整个程序的过程调用及状态协调
2.MapTask:负责Map阶段的整个数据处理流程
3.ReduceTask:负责Reduce阶段的整个数据处理流程
(1)什么是序列化
序列化就是把内存中的对象,转换成字节序列以便于存储到磁盘和网络传输
反序列化就是将收到字节序列或者是磁盘的持久化数据,转换成内存中的对象
(2)为什么不用Java的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。
(3)Hadoop 序列化特点:
(4)如何实现hadoop的序列化
具体实现 bean 对象序列化步骤如下 7 步。
必须实现 Writable 接口
必须有空参构造。(反序列化时,需要反射调用空参构造函数,所以必须有空参构造。如果没有任何构造函数,可以不写,如果有了带参的构造函数,必须加上空参构造函数)。
重写序列化write方法
重写反序列化readFields方法
注意反序列化的顺序和序列化的顺序完全一致
要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用(注意默认传输过来的是地址值)。
如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,重写compareTo方法,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。【也可直接实现WritableComparable】
(1)MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度
MapTask太少,并行能力若,导致task等待,延长处理时间。如果太多,会导致任务启动的时间大于任务本身处理时间,造成资源浪费
(2)MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位
数据切片:只是在逻辑上对数据进行分片,并不会在磁盘上将其切分成片存储。数据切片是MapReduce程序计算输入数据的党委,一个切片会对应启动一个MapTask
建立连接,创建提交 Job 的集群代理
创建给集群提交数据的 Stag 路径,如果是本地运行环境使用file协议,如果是yarn集群运行环境,则使用HDFS协议
获取 jobid,将jobid和Stag路径拼接起来,用于该任务的提交路径
调用FileInputFormat.getSplits()切片规划,并序列化成Job.split
将Job相关参数写到文件Job.xml
如果是yarn集群运行环境,还需要拷贝拷贝 jar 包到集群
(1)程序先找到输入数据存储的目录。
(2)开始遍历处理目录下的每一个文件
(3)按照每一个文件单独切片,具体如下:
(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件,二进制格式文件,数据库表等。
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、CombineTextInputFormat、NLineInputFormat和自定义 InputFormat 等。
(1)TextInputFormat:默认实现类,按行读取每条记录,
(2)KeyValueTextInputFormat:用于将小文件在切片过程中生成一个单独的切片或者少量切片
(3)NLineInputFormat:指定行数N来划分切片
(4)CombineTextInputFormat:对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTash.
整体框架:
MR的详细工作流程:假如一个200M的处理文件
1)切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按128M进行切片
2)提交:提交可以提交到本地工作环境或者Yarn工作环境,本地只需要提交切片规划Job split和Job相关参数文件,Yarm环境还需要提交jar包。
3)将Job提交到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
4)MapTask中执行Mapper的map方法,此方法需要k-v作为输入参数。
5)首先调用inputFormat方法,默认为TextputFormat方法,此方法调用createRecodeReader方法,一行一行读取数据封装为k,v键值对,传递给map方法。
6)map方法进一系列用户的写的逻辑操作,到这里map阶段其实已经完成了,下面进入Shuffer阶段
7)map处理完成相关的逻辑操作之后,会产生一系列的K-V,首先通过OutputCollertor。collect() 向环形缓冲区写入数据。环形缓冲区主要两部分:一部分写入数据的索引信息,另一部分写入数据的内容。环形缓冲区的默认大小是100M,当缓冲的容量达到默认大小的80%时,开启一个新的线程进行反向溢写,之所以反向溢写,因为这样可以边接收数据边往磁盘溢写数据 。
8)在环形缓冲区写到文件之前,会将缓冲区中的数据进行分区,并针对key的索引按照字典顺序进行快速排序
9)在分区和排序之后,溢写磁盘,可能发生多次溢写,溢写到多个文件。
10)对所有溢写到磁盘的文件进行归并排序。
11)在7-9步之间还可以有一个Combine合并操作,意义是对每个MapTask的输出进行局部汇总,以减少网络传输能量:
Map阶段的进程数一般比Reduce阶段要多,所以放在Map阶段处理效率更高
Map阶段合并之后,传递给Reduce的数据就会少很多
但是Conbine能够应用的前提时不能影响最终的业务逻辑量,而且Combiner的输出key和Reduce的输入k-v类型对应起来。
12)另外,在第10步,还可以进行压缩,以减少网络传输量。
Reduce阶段:
13)MapTask结束后,启动相应数量的ReduceTask
14)ReduceTask从每个MapTask上远程拷贝,相应的数据文件,如果文件大小超过一定阈值,则溢写速度上,否则存储在内存中。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
15)最后将数据传给reduce进行处理,一次读取一组数据
16)最后通过OutputFomat输出,默认为TextOutputFormat方法,在此方法调用createRecodeWrite将结果写到HDFS中。
MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的巨顶时可以直接手动设置,一般与getPartition的结果数相同。
1)问题引出
要求将统计结果按照条件输出到不同文件中。比如:将统计结果按照收集归属地不同省份输出到不同文件中
2)默认Partition分区
默认分区是根据key的hashCode对ReduceTask个数取得到的。用户没法空值哪个key存储到哪个分区。
3)自定义Partition步骤
(1)自定义类继承Partitioner,重写getPartition()方法
(2)在Job驱动中,设置自定义Partitioner
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
4)分区总结:
如果没有自定义的partitioner,则默认的partition算法,即根据每一条数据的key的hashcode值的模运算(取余%)reduce的数量,得到的数字就是“分区号”。用户没控制哪个key存储到哪个分区。
如果MapTask的分区数不是1,但是ReduceTask为1,是否执行分区过程,答案是:不执行分区过程,因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1,不大于1肯定不执行。
1)map的数量
默认情况下,切片大小=blocksize,如果增大切片大小,则将minSize设置大于128M,如果要减小切片大小,则将maxSize设置小于128M.
map数量由处理的数据分成的block数量决定 num= total_size / split_size
2)reduce数量
需要注意,ReduceTask的数量并不是随意设置的:
如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx
如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会抛出Exception
如果ReduceTask的数量=1,则不管getPartition的结果数为多少,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000。
ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
具体多少个ReduceTask,需要根据集群性能而定。
如果MapTask的分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
分区号必须从零开始,逐一累加。
3)MapTask和ReduceTask数量的合理性
MapTask和RedceTask的数量并不是越多越好,也不是越少越好,要根据具体情况而定,太少,并行能力弱,会导致task等待,延长处理时间;太多,可能会导致任务启动的时间大于任务本身处理的时间,会得不尝失,造成资源的浪费。
排序时MR框架中最重要的操作之一
MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序时按照字典顺序排序,且是西安该配许的方法是快速排序。
对于MapTask会将处理结果暂时放到环形缓冲区中,当该区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,数据处理完毕后,对磁盘上所有文件进行归并排序。
ReduceTask从每个MapTask上远程拷贝相应的数据文件,文件超过一定阈值,则溢写再磁盘上,否则存储再内存中。如果磁盘上文件数据达到一定阈值,则进行一次贵宾给排序生成一个更大的文件;如果内存中文件大小或者数目达到一定阈值,则进行一次合并后将数据溢写到磁盘上。所有数据拷贝完毕后,ReduceTask同一对内存和磁盘上的所有数据进行一次归并排序。
(一)排序分类
1)部分有序:MR根据输入记录的键对数据集排序,保证文件内部有序
2)全排序:最终结果只有一个文件,且文件内部有序,实现方式为只设置一个MR。但是处理大型文件效率低。
代替方案:使用自定义分区来描述输出的全局排序。
3)辅助排序:再Reduce端对key进行分组。
4)二次排序:再自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
**(二)自定义排序:**实现WritableComparable接口,重写compareTo方法,就可以是实现排序。
1)setup,此方法被MR框架仅且执行一次,再执行Map任务前,进行相关变量或者资源的集中初始化工作。
2)cleanup,此方法被MR框架仅且执行一次,再执行Map任务后,相关变量或者资源的释放工作。
3)run 是程序启动运行。
4)context:这是MR任务运行的一个上下文,包含了整个任务的全部信息,context作为map和reduce执行中各个函数的一个桥梁。
5)执行顺序
setup —> map或者reduce —>cleanup
首先,定义符合业务逻辑的Bean类,并实现WritableComparable中的compareTo方法
然后,定义一个TopNMapper,添加一个全局的TreeMap对象(天然按key排序),其key的类型为Bean,value根据情况而定,然后在map里面将每一个行记录定义一个Bean对象,放在treeMap中,并判断treeMap.Size()是否大于N,如果大于N,则执行treeMap.remove(treeMap.firstKey())移除最小的key,这样就能一直保证treeMap中只有N个最大的数据。通过Mapper的cleanup方法通过context.write一次性将treeMap中的内容写出(写出的key是Bean,value视情况而定)。这样就完成了一个MapTask的TopN实现,但是根据实际文件个数和文件大小的不同,很可能有多个MapTask,所以要在Reduce阶段再次实现TopN。
定义一个TopNReducer,同样添加一个全局的TreeMap对象(天然按key排序),在reduce里面的业务逻辑和map里面相同,一直保证treeMap中只有N个最大的数据,最后通过Reducer的cleanup方法一次性将treeMap中的内容写出。
OutputFormat是MR输出的基类,所有实现MR输出都实现了OutputFormat接口。
介绍几种常见的OutputFormat实现类:
1.默认输出格式为TextOutputFormat,功能逻辑是: 将每一个KV对,向目标文件文件输出一行
2.自定义OutputFormat
应用场景:输出数据到MySQL 等存储框架中
自定义OutputFormat步骤
自定义一个类继承FileOutputFormat
重写getRecordWrite方法,具体是重写RecordWrite的wtite()方法。
1)容错性
一旦运行失效,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认2次,一旦超过最高重启次数,则作业运行失效
2)MapTask/ReduceTask
Task周期性向MRAppMaster汇报心跳;一旦Task挂掉,则MRappMaster将位置重新申请资源,并运行之,最多重新运行次数可由用户设置,默认4次。
1)作业完成时间取决于最慢的任务完成时间
2)推测执行机制
发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度,为拖后腿任务启动一个备份任务,同时运行,谁先运行完,则采用谁的结果。
3)不能启动推测执行机制情况
(1)任务存在严重的数据负载倾斜
(2)特殊任务,比如任务向数据库中写数据。
4)算法原理
某一时刻,任务T的执行进度是一个百分比,则可通过一定的算法推测出该任务的最终完成时间,另一方面,为此刻的任务启动一个备份任务,则可推断出它可能完成的时刻。
推测执行机制实际上采用了经典的算法优化方法:以空间换时间。
1)数据量很小,计算量很大
2)繁杂的小文件
3)索引是更好的存储机制的时候
4)事务处理
5)自由一台机器的时间
6)不擅长DAG计算
压缩格式 | split | native | 压缩率 | 速度 | 是否hadoop自带 | linux命令 | 换成压缩格式后,原来的应用程序是否需要修改 |
---|---|---|---|---|---|---|---|
gzip | 否 | 是 | 很高 | 比较快 | 是,直接使用 | 有 | 和文本处理一样,不需要修改 |
izo | 是 | 是 | 比较高 | 很快 | 否,需要安装 | 有 | 需要建索引,还需要指定输入格式 |
snappy | 否 | 是 | 比较高 | 很快 | 否,需要安装 | 没有 | 和文本处理一样,不需要修改 |
bzip2 | 是 | 否 | 最高 | 慢 | 是,直接使用 | 有 | 和文本一样,不需要处理 |
输入端:
数据量小于块大小用Gzip,snappy
数据量非常打建议用LZO
Mapper输出:
建议用LZO、Snappy
Reduce输出:
一般归档用Bzip2
分布式缓存一个最重要的应用就是再进行join操作的时间,如果一个表很大,另一个表很小,如果将小表进行广播处理,即每个计算节点都有一分,然后进行map端的连接操作,这种情况下处理效率大于一般的reduce端join,因为Reduce端处理过多的表,非常容易发生数据倾斜。
1)reduce join: 再map阶段,map函数同时读取两个文件File1和File2,为了区分两个不同来源的数据对,每条数据打一个标签tag。第一个file1,tag=1。第二个File2,tag=2。再Reduce端以连接字段作为key的分组已经完成。
2)map join: 和Hadoop的缓存机制差不多
1)计算机性能
2)I/O操作优化
(1)数据倾斜,一个任务多,一个任务少
(2)Map运行时间长,Reduce等待过久
(3)map和reduce数设置不合理
(4)小文件过多
(5)spill次数过多
(6)merge次数过多
(1)Map和Shuffle
1)自定义分区,减少数据倾斜
2)减少溢写次数
3)增加每个Merge合并次数
4)减少磁盘IO,采用snappy或者LZO压缩
5)默认MapTask内存上限增加、
6)控制MapTask堆内存大小
7)增加MapTask的CPU核数
8)增大重试次数
(2)Reduce
1)提高Reduce拉取数据的并行数
2)提高Buffer占用的内存比例
3)提高ReduceTask的内存上限
4)控制ReduceTask堆大小
5)增加ReduceTask的CPU核数
6)如果不用reduce尽量不用
(3)数据输入
1)空值引发的数据倾斜
方法一:异常数据,空KEY过滤。join的时候key对应的数据太多
方法二:非异常数据,空key转换过多
2)表连接引发的数据倾斜
3)group by 。维度过小,Map阶段同一key有大量的数据分发给一个reduce.
4)count()去重统计。数据量大过大,导致JOB很难完成。
5)不可拆分大文件引发的数据倾斜。
换用不用的压缩方式
Hadoop小文件弊端
HDFS每个文件都要再NameNode上创建对应的元数据,当小文件比较多的时候,就会产生很多的元数据文件,占用大量的NameNode的内存空间,另一方面就是元数据较多,使得索引变慢。
小文件过多,进行计算时,会造成过多切片,需要启动过多MapTask,每个MapTask处理的数据小,导致处理时间比启动时间还小,浪费资源。
Hadoop 小文件解决方案
(1)数据采集的时候,将小文件或小批次数据合成大文件再上传
(2)通过HDFS的har归档文件进行归档,将HDFS中一个个小文件归档成一个文件。
(3)将多个小文件在切片过程中生成一个单独的切片
(4)开启uber,实现JVM重用
(5)使用Sequence file
(1)HDFS小文件的弊端
每个块的元数据存储在NameNode的内存中,大量的小文件会好景NameNode中的大部分内存,当小文件比较多的时候,就会长生很多的元数据问及教案,一方面会大量占用NameNode的内存空间,另一方面就是元数据文件过多,使得寻址索引速度变慢。小文件过多,在进行MR计算时,会生成过多切片,需要启动过多的MapTask,每个MapTask处理的数据量小,导致MapTask的处理时间比启动时间小,白白消耗资源。
(2)解决存储小文件办法之一
通过HDFS的har归档文件进行归档,它将HDFS中的一个个小文件归档成一个文件,对NameNode时一个整体,但是其内部实际上还是许多个小文件,减少了NameNode的内存。
HDFS和MapReduce子框架主要时对大数据文件来设计的,但是在小文件处理上不但效率低下,而且十分消耗内存资源。HDFS提供了两种类型的容器分别时SequenceFile和MapFile.
SequenceFile 是Hadoop的一个重要数据类型,提供k-v存储,与传统不同的是不能对已存在的k进行写操作
解决问题: 该文件格式通常被用来解决hadoop中的小文件问题,相当于一个容器。
SequenceFile支持两个数据压缩: record compression 和 block compression
MapFile:是排序后的SequenceFile。两部分组成 data和index。
局限性: 1.文件不支持腹泻操作,对已存在的SequenceFile追加存储记录
2.当write流不关闭的时候,没办法构造read流。
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台。MR相当于运行于操作系统之上的应用程序。
Yarn主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。
ResourceManager(RM)是整个集群资源(内存、CPU等)的老大,它是整个集群资源的主要协调者和管理者,具体如下:
NodeManager(NM)是单个节点服务器资源老大,主要负责该节点内所有容器的生命周期的管理,监视资源和跟踪节点健康,具体如下:
ApplicationMaster(AM)是单个任务运行的老大。在用户提交一个应用程序时,YARN 会通过ResourceManager 启动一个轻量级的进程 ApplicationMaster,通过ApplicationMaster来管理整个任务的运行。ApplicationMaster 作业具体如下:
ResourceManager
申请资源,监控申请的资源的使用情况;1.Client提交作业到YARN上;
2.Resource Manager 选择一个NodeManager,启动一个Container 并运行Application
3.Application Master根据实际需要向Resource Manager请求更多的Container资源
4.Application Master通过获取到的Container 资源执行分布式计算。
当资源管理器收到client的资源请求时,将该请求发给调度器,调度器分配给container,然后资源管理器在container内启动Application Master进程。由NodeManager监控。
调度算法:
1)FIFO:单队列,根据提交作业的先后顺序,先来先服务。
缺点:不支持多队列,生产环境很少使用
2)容器调度器
支持多队列,每个队列采用FIFO调度策略
容量保证,管理员可为每个队列设置资源最低保证和资源使用上限
灵活性:队列中的资源有甚于,可以暂时共享给哪些需要资源的队列。
多租户:支持多用户共享集群。
分配资源算法:优先选择资源占用率最低的队列进行分配
3)公平调度算法
优先选择对资源的缺额比例大的。、
高可用分为 HDFS高可用和YARN高可用,但是HDFS对数据存储及其一致性要求比YARN高。
1.高可用概述:
Hadoop2.0中,解决了单点问题,适合生产环境。
2.高可用整体框架组件:
ActiveNameNode和StandbyNameNode: 一台运行,一台备份
主备切换控制器ZKFailoverController独立的进程运行,对NameNode主备切换进行总体控制,在NameNode故障时借助Zookeeper实现自动的主备选举和切换。
Zookeeper集群:为主备切换控制器提供主备选举支持
共享存储系统:是NameNode高可用最为关键的部分,保存了HDFS的元数据。通过共享存储系统实现元数据同步。
DataNode节点:向NameNode主备机上报数据块的位置信息。
3。NameNode主备切换
NameNode 主备切换主要由 ZKFailoverController、HealthMonitor 和 ActiveStandbyElector 这 3 个组件来协同实现。
4.Zookeeper主备选举机制
(1)创建锁节点
(2)注册Eatcher监听
(3)自动触发著备选机制
(4)防止脑裂
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。