当前位置:   article > 正文

Hadoop学习笔记

hadoop学习笔记

Hadoop学习笔记

前言:大数据的概念

4个V:

  • Volume(大量):截至目前,人类生产的所有印刷材料的数据量是200PB,而历史上全人类总共说过的话的数据量大约是5EB。当前,典型个人计算机硬盘的容量为TB量级,而 一些大企业的数据量已经接近EB量级。
    在这里插入图片描述
  • Velocity(高速):这是大数据区分于传统数据挖掘的最显著特征。根据IDC的“数字宇宙”的报告,预计到2025年,全球数据使用量将达到163ZB。在如此海量的数据面前,处 理数据的效率就是企业的生命。
    天猫双十一:2017年3分01秒,天猫交易额超过100亿 2020年96秒,天猫交易额超过100亿
  • Variety(多样):这种类型的多样性也让数据被分为结构化数据和非结构化数据。相对于以往便于存储的以数据库/文本为主的结构化数据,非结构化数据越来越多,包括网络日志、音频、视频、图片、地理位置信息等,这些多类型的数据对数据的处理能力提出了更高要求。
  • Value(低价值密度):价值密度的高低与数据总量的大小成反比。如何快速对有价值数据“提纯”成为目前大数据背景下待解决的难题。

本文所涉及到的相关简写介绍如下:

  • NN,NameNode:命名节点
  • DN,DataNode:数据节点
  • RM, ResourceManager:资源管理器
  • NM,NodeManager:节点管理器
  • 2NN,Secondary NameNode:第二名称节点
  • QJM,Quorum Journal Manager:群体日志管理器
  • FC,Failover Controller:故障转移控制器
  • ZKFC:,Zookeeper Failover Controller:Zookeeper故障转移控制器

一、Hadoop入门

1.概念

1.1 Hadoop是什么?

Hadoop是一个由Apache基金会所开发的分布式系统基础架构。主要解决:海量数据的存储和分析计算的问题。(大数据技术涉及到的问题有:海联数据的收集、存储和计算。)
广义上Hadoop通常指一个更广泛的概念——Hadoop生态圈
在这里插入图片描述

1.2 Hadoop发展历史

1)Hadoop创始人Doug Cutting,为了实现与Google类似的全文搜索功能,他在Lucene框架基础上进行优化升级,查询引擎和索引引擎。
在这里插入图片描述

2)2001年年底Lucene成为Apache基金会的一个子项目。
3)对于海量数据的场景,Lucene框架面对与Google同样的困难,存储海量数据困难,检索海量速度慢。
4)学习和模仿Google解决这些问题的办法 :微型版Nutch。
5)可以说Google是Hadoop的思想之源(Google在大数据方面的三篇论文):
GFS —>HDFS
Map-Reduce —>MR
BigTable —>HBase
6)2003-2004年,Google公开了部分GFS和MapReduce思想的细节,以此为基础Doug Cutting等人用
了2年业余时间实现了DFS和MapReduce机制,使Nutch性能飙升。
7)2005 年Hadoop 作为 Lucene的子项目 Nutch的一部分正式引入Apache基金会。
8)2006 年 3 月份,Map-Reduce和Nutch Distributed File System (NDFS)分别被纳入到 Hadoop 项目
中,Hadoop就此正式诞生,标志着大数据时代来临。
9)名字来源于Doug Cutting儿子的玩具大象
在这里插入图片描述

1.3 Hadoop三大发行版本

Hadoop 三大发行版本:Apache、Cloudera、Hortonworks。
Apache 版本最原始(最基础)的版本,对于入门学习最好。2006
Cloudera 内部集成了很多大数据框架,对应产品 CDH,PaaS。2008
Hortonworks 文档较好,对应产品 HDP,PaaS。2011
Hortonworks 现在已经被 Cloudera 公司收购,推出新的品牌 CDP。

1.4Hadoop的优势

  • 1)高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元 素或存储出现故障,也不会导致数据的丢失。

  • 2)高扩展性:在集群间分配任务数据,可方便的动态扩展数以千计的节点。

  • 3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。
    在这里插入图片描述

  • 4)高容错性:能够自动将失败的任务重新分配。
    在这里插入图片描述

1.5Hadoop的组成

在这里插入图片描述

  • 在 Hadoop1.x 时 代 ,Hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。
  • 在Hadoop2.x时代,增加了Yarn。Yarn只负责资源的调度 ,MapReduce只负责运算。
  • Hadoop3.x在组成上没有变化
1.5.1 HDFS架构概述

Hadoop Distributed File System,简称 HDFS,是一个分布式文件系统。

  • 1)NameNode(NN):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等(这一部分其实是在集群启动是由DataNode进行汇报所获取的,NameNode)。(指明数据存在的位置)
  • 2)DataNode(DN):在本地文件系统中具体的存储文件块数据,以及块数据的校验和。(具体数据存储的位置)
  • 3)Secondary NameNode(2NN):每隔一段时间对NameNode元数据进行“备份”。(类似于NameNode的“备份”)
1.5.2 YARN架构概述

Yet Another Resource Negotiator 简称 YARN ,另一种资源协调者,是 Hadoop 的资源管理器。

  • 1)ResourceManager(RM):整个集群资源(内存、CPU等)的老大。
  • 2)NodeManager(NM):单个节点服务器资源老大
  • 3)ApplicationMaster(AM):单个任务运行的老大
  • 4)Container:容器,相当一台独立的服务器,里面封装了任务运行所需要的资源,如内存、CPU、磁盘、网络等。
  • 说明1:客户端可以有多个
  • 说明2:集群上可以运行多个ApplicationMaster
  • 说明3:每个NodeManager上可以有多个Container
    在这里插入图片描述
1.5.3 MapReduce架构概述

MapReduce 将计算过程分为两个阶段:Map 和 Reduce

  • 1)Map 阶段并行处理输入的数据
  • 2)Reduce 阶段对 Map 结果进行汇总
1.5.4 HDFS、YARN、MapRecduce三者间的关系

在这里插入图片描述
经由客户端client提交一个Task给ResouceManager,RM选择一台节点建立Container将此Task的APP Mstr放入,然后APP Mstr通过分析任务向RM申请计算资源,RM将对应的计算资源分配给APP Mstr进行MapTask的计算,计算结束后将新建一个ReduceTask接受Map Task的结果,然后将结果进行汇总、存储并报告NameNode进行记录,然后SecondaryNameNode一段时间后也会根据NameNode的内容进行更新。

1.6大数据技术生态体系

在这里插入图片描述
图中涉及的技术名词解释如下:

  • 1)Sqoop:Sqoop 是一款开源的工具,主要用于在 Hadoop、Hive与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到 Hadoop的 HDFS 中,也可以将 HDFS 的数据导进到关系型数据库中。
  • 2)Flume:Flume 是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;
  • 3)Kafka:Kafka 是一种高吞吐量的分布式发布订阅消息系统;
  • 4)Spark:Spark 是当前最流行的开源大数据内存计算框架。可以基于 Hadoop 上存储的大数据进行计算。
  • 5)Flink:Flink 是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。
  • 6)Oozie:Oozie 是一个管理 Hadoop 作业(job)的工作流程调度管理系统。
  • 7)Hbase:HBase 是一个分布式的、面向列的开源数据库。HBase 不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
  • 8)Hive:Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运行。其优点是学习成本低,可以通过类 SQL 语句快速实现简单的 MapReduce 统计,不必开发专门的 MapReduce 应用,十分适合数据仓库的统计分析。
  • 9)ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。

1.7推荐系统案例

在这里插入图片描述

2. 环境准备

(这一节配合PDF以及思维导图进行学习。)


2.1模板虚拟机的准备


2.2克隆


2.3安装JDK以及Hadoop


3.Hadoop生产集群搭建

(这一节配合PDF以及思维导图进行学习。)


3.1本地模式


3.2完全分布式集群搭建(开发和面试的重点)


4.常见错误的解决方案

(这一节配合PDF以及思维导图进行学习。)


二、HDFS

1.HDFS概述

1.1HDFS的产生背景和定义

  • 背景:随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这 就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。
  • 定义:HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务 器有各自的角色。
  • HDFS 的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变

1.2优缺点

优点:

  • 1)高容错性 数据自动保存多个副本。它通过增加副本的形式,提高容错性。某一个副本丢失以后,它可以自动恢复。
  • 2)适合处理大数据 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据; 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
  • 3)可构建在廉价机器上,通过多副本机制,提高可靠性。

缺点:

  • 1)不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。
  • 2)无法高效的对大量小文件进行存储。存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的;小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。
  • 3)不支持并发写入、文件随机修改。一个文件只能有一个写,不允许多个线程同时写;仅支持数据append(追加),不支持文件的随机修改。

1.3组成

在这里插入图片描述
1)NameNode(NN):就是Master,它是一个主管、管理者,存储着整个集群所有数据的相关信息。
(1)管理HDFS的名称空间;
(2)配置副本策略;
(3)管理数据块(Block)映射信息;
(4)处理客户端读写请求。
2)DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
(1)存储实际的数据块;
(2)执行数据块的读/写操作
3)Client:能够对HDFS文件系统进行操作的地方就等价于客户端。
(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;
(2)与NameNode交互,获取文件的位置信息;
(3)与DataNode交互,读取或者写入数据;
(4)Client提供一些命令来管理HDFS,比如NameNode格式化;
(5)Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;
4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
(1)辅助NameNode,分担其工作量,比如定期合并Fsimage镜像文件和Edits,并推送给NameNode;
(2)在紧急情况下,可辅助恢复NameNode。

1.4文件块大小(面试重点)

HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。
在这里插入图片描述
思考:为什么块的大小不能设置太小,也不能设置太大?
(1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。
总结:HDFS块的大小设置主要取决于磁盘传输速率,。

2.HDFS的Shell相关操作(开发面试重点)

2.1 基本语法

2.1.1 上传

hadoop fs + 命令 === hadoop dfs + 命令
1)-moveFromLocal:从本地剪切粘贴到 HDFS

[atguigu@hadoop102 hadoop-3.1.3]$ vim shuguo.txt
输入:
shuguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./shuguo.txt /sanguo
  • 1
  • 2
  • 3
  • 4

2)-copyFromLocal:从本地文件系统中拷贝文件到 HDFS 路径去

[atguigu@hadoop102 hadoop-3.1.3]$ vim weiguo.txt
输入:
weiguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -copyFromLocal weiguo.txt /sanguo
  • 1
  • 2
  • 3
  • 4

3)-put:等同于 copyFromLocal,生产环境更习惯用 put

[atguigu@hadoop102 hadoop-3.1.3]$ vim wuguo.txt
输入:
wuguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -put ./wuguo.txt /sanguo
  • 1
  • 2
  • 3
  • 4

4)-appendToFile:追加一个文件到已经存在的文件末尾

[atguigu@hadoop102 hadoop-3.1.3]$ vim liubei.txt
输入:
liubei
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
  • 1
  • 2
  • 3
  • 4
2.1.2 下载

1)-copyToLocal:从 HDFS 拷贝到本地

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -copyToLocal /sanguo/shuguo.txt ./
  • 1

2)-get:等同于 copyToLocal,生产环境更习惯用 get

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt
  • 1
2.1.3 HDFS直接操作

1)-ls: 显示目录信息

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -ls /sanguo
  • 1

2)-cat:显示文件内容

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -cat /sanguo/shuguo.txt
  • 1

3)-chgrp、-chmod、-chown:Linux 文件系统中的用法一样,修改文件所属权限

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -chmod 666 /sanguo/shuguo.txt
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -chown atguigu:atguigu /sanguo/shuguo.txt
  • 1
  • 2

4)-mkdir:创建路径

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /jinguo
  • 1

5)-cp:从 HDFS 的一个路径拷贝到 HDFS 的另一个路径

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -cp /sanguo/shuguo.txt /jinguo
  • 1

6)-mv:在 HDFS 目录中移动文件

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/wuguo.txt /jinguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/weiguo.txt /jinguo
  • 1
  • 2

7)-tail:显示一个文件的末尾 1kb 的数据

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -tail /jinguo/shuguo.txt
  • 1

8)-rm:删除文件或文件夹

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -rm /sanguo/shuguo.txt
  • 1

9)-rm -r:递归删除目录及目录里面内容

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -rm -r /sanguo
  • 1

10)-du 统计文件夹的大小信息

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -du -s -h /jinguo
27 81 /jinguo
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -du -h /jinguo
14 42 /jinguo/shuguo.txt
7 21 /jinguo/weiguo.txt
6 18 /jinguo/wuguo.tx
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

说明:27 表示文件大小;81 表示 27*3 个副本总共的大小;/jinguo 表示查看的目录

11)-setrep:设置 HDFS 中文件的副本数量

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -setrep 10 /jinguo/shuguo.txt
  • 1

这里设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得看 DataNode 的数量。因为目前只有 3 台设备,最多也就 3 个副本,只有节点数的增加到 10台时,副本数才能达到 10。

3.HDFS的客户端API

3.1数据的上传和下载


4.HDFS的读写流程(面试重点)

4.1 写数据流程

4.1.1 本地文件写入服务器

在这里插入图片描述
本地文件想要上传到HDFS中:

  • 1)首先Client创建分布式的文件系统对象Distributed FileSystem,将文件进行分布式文件系统转存(这里还不太明白,可能是指将文件预处理成符合HDFS文件系统中Block的大小吧),并向NameNode发出上传文件请求。
  • 2)NameNode接收到请求后会进行权限检查,首先会检查这个客户端有没有进行上传操作的权限,其二会检查上传操作的合理性,比如说:上传的目的路径是否存在等,以及其他检查信息。如果一切合法,那么NameNode将会向Client进行响应可以上传。
  • 3)Client接收到可以上传的响应后,会准备上传第一个文件块,并向NameNode请求DataNode的位置。
  • 4)NameNode运行副本存储节点选择策略进行存储节点的选择,根据距离最近原则并考虑到负载均衡等因素进行选择存储节点的选择,而且会尽可能的选择同一机架进行存储
  • 5)Client接收到NameNode的存储位置的响应后,首先会向“最近的”DataNode请求建立传输通道进行数据传输,然后这个DataNode再与另外几个用于存储的DataNode通过递归的方式按顺序建立数据传输通道。数据传输的过程为:首先,Client在本地建立数据传输缓冲队列,其中存储的是chunk大小为516B(其中包括512B的chunk以及4B用来校验的chunksum),然后当攒够127个chunk,也就是64KB时,就构成了HDFS进行数据传输的最小单位Packet。然后将其放到另一个缓冲队列中等待发送,同时会在本地在建立一个缓存队列,用于保存发送了的Packet,以防因为网络原因造成的丢包,只有当确认了DataNode返回的ack消息后才会将对应的Packet删除。
  • 6)在DataNode接收到Client发送过来的Packet后,一是将其保存到本地,二是同时进行数据传输将其传给下一个DataNode。而且每个Packet接受成功后都会向上级返回一个ack消息,所有DataNode的操作逻辑相同。
  • 7)如果数据传输过程中出现丢包的问题,此时Client会在缓存队列中寻找对应的Packet重新进行发送。
  • 8)数据传输结束,释放资源。
4.1.2 网络拓扑——节点距离计算

在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。
在这里插入图片描述

4.1.3 机架感知(副本存储节点选择)

在这里插入图片描述
备份的数量为3的时候:

  • 1)第一个节点选择策略是最近原则,选择执行上传命令的那一台Client机器的位置,上传较快;如果上传命令是在集群外的机器上执行的,那么则在集群中随机选一台。当然选择的结果还需要考虑节点的状态(isGoodTarget()方法),结点容量是否足够、结点流量情况、该节点所在的机架中存放当前数据的DN是否过多;
  • 2)第二个节点尽可能的选择与第一个节点不同的机架上的随机一个节点进行存储,为了数据的可靠性。但如果,尽可能的选择没有成功,则选择与第一个节点同机架上的一个节点。
  • 3)第三个节点:如果前两个节点在同一个机架上了,那么尽可能的选择与他们不同的的机架,过程同第二个节点的选择过程一样。否则如果newBlock(这个参数是为了区分什么还不清楚)为true则选择与第二个节点同机架的节点。否则选择与第一个节点同机架上的一个节点。
  • 那既然是为了可靠性着想,那第三个节点为什么不再另选一个机架进行存储呢?这里是为了兼顾传输效率,毕竟在同机架内部进行传输速度较快。
  • 这一部分的实现为源码hadoop-hdfs:3.1.3中BlockPlacementPolicyDefault类中chooseTargetInOrder()方法。

那如果备份数量是3个以上呢?怎么选择节点?

  • 首先前三个节点选择的方式与上述相同,剩下数目的副本选择则是在一定范围内进行随机选择,这个方法叫做chooseRandom(),源码所在位置与上面相同,之后进行数据传输的优化组成pipeline(这什么意思?)。
  • 其实前三个节点的选择策略的实现,也是基于chooseRandom()实现的,通过改变一个参数来限制节点随机选择的范围。
  • 参考:https://www.cnblogs.com/gwgyk/p/4137060.html

4.2 HDFS读数据流程


本地客户端想要从集群中拉取数据:

  • 1)客户端Client先建立一个分布式文件系统对象Distributed File System,用于和集群中的NameNode相互通信,想NameNode发送所请求文件的信息。
  • 2)NameNode判断此客户端是否有权限进行相关操作,并且查询对应数据所在的DataNode的位置信息,返回给客户端Client的Distributed File System对象。
  • 3)客户端要进行数据的传输,需要先建立一个文件传输流FSDataInputStream向距离客户端Client最近的DataNode发送传输数据的请求。
  • 4)DataNode通过衡量自己的负载情况来决定是否给此Client传输数据,如果负载情况良好,便同意进行数据的传输;否则,拒绝请求。此时客户端将向另一个存储着所需文件的DataNode发出请求,进行同样的操作。
  • 5)在读数据的过程中,如果数据量大于一个HDFS存储Block,那么第二个数据块的读取过程同第4步中的一样。需要注意的是,在传输过程中同一文件的两个数据块传输关系是串行的,读取结束一个块,才开始下一个块的传输。
  • 6)最后释放资源。

5.NN和2NN(面试重点)

(但是实际企业实践中我们不用2NN,我们会设置NameNode的高可用,两个NameNode。)
下图就是NN中所存储的信息:
在这里插入图片描述

  • edits是日志文件(记录对DataNode进行的操作,只进行追加操作,效率很高)
  • edits_inprogress是当前正在负责记录的日志文件
  • fsimage对应的曾经的镜像文件
  • seen_txid中存储了当前最大的日志文件的序号
  • VERSION中是当前集群的版本信息

2NN中的内容如下图:
在这里插入图片描述

  • edits、fsimage以及VERSION作用与NN中一样

思考:NameNode 中的元数据是存储在哪里的?

  • 首先,我们做个假设,如果存储在 NameNode 节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。
  • 但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的 FsImage 镜像文件。
  • 但是这样又会带来新的问题,当在 NameNode 内存中的元数据更新时,如果同时更新 FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦 NameNode 节点断电,就会产生数据丢失。
  • 因此,引入 Edits 日志文件(记录对DataNode进行的操作,只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到 Edits 中。这样,一旦 NameNode 节点断电,可以通过 FsImage 和 Edits 的合并,合成元数据。(一般在启动Hadoop集群时就会进行这个操作)
  • 但是,如果长时间添加数据到 Edits 中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行 FsImage 和 Edits 的合并,如果这个操作由 NameNode 节点完成,又会效率过低。因此,引入一个新的节点 SecondaryNameNode,专门用于 FsImage 和 Edits 的合并。
    在这里插入图片描述
  • 1)集群启动时,加载上一次的edits_inprogress_001操作日志文件以及fsimage元数据镜像文件。
  • 2)对于Client发来的操作请求,首先使用edits_inprogress_001进行记录,在原有的操作上进行后续追加,然后再会进行对元数据实际的内存操作。

对于SecondaryNameNode:

  • 1)在集群启动后他会定时的向NameNode发送请求查看是否需要执行CheckPoint合并日志操作,又或者达到了CheckPoint操作的触发条件(定时时间到,默认是1个小时;日志文件中存储了100万条操作,2NN每隔1分钟查询NN一次所进行操作的总数量),2NN便会执行CheckPoint操作。
  • 2)首先,在NameNode处生成一个edits_inprogress_002文件用于存储在进行合并操作过程中产生的元数据操作日志,原来的edits_inprogress_001则被更名为edits_001和NameNode上的fsimage镜像文件一起拷贝到2NN上用于进行合并操作。
  • 3)在2NN中将edits_001和fsimage在内存中完成合并后,生成新的fsimage.checkpoint元数据镜像文件,并将其拷贝回NameNode,并将其更名为fsimage覆盖掉旧的fsimage文件。

6.DataNode

6.1 DataNode工作机制

DataNode中存储的文件如下图:
在这里插入图片描述

  • blk.meta是对应blk文件的检验文件,里面存储了对应数据块数据长度、校验和、时间戳等信息。
  • blk是存储的文件数据的本身。
    在这里插入图片描述
  • 1)集群在启动之后,DataNode首先向NameNode进行信息注册,将自己当前机器上所存储了哪些文件的哪些块汇报给NameNode。
  • 2)NameNode这时就会将汇报来的信息存储到自己的内存中,用来查阅,并且返回给DataNode一个注册成功的消息。
  • 3)DataNode以后默认每隔6个小时(如果你集群的性能比较差,这个时间可以缩短)就会重新上报自己所存储的块的所有信息,用于更新。除此之外每个DataNode每隔3秒便会向NN发送心跳包,证明自己还正常工作。
  • 4)如果一个DataNode超过10分钟 + 30秒还没有向NN发送心跳包,NN则认为这个DN“死”掉了。后续的相关操作就会避开这个DN。10分钟是考虑到这个DataNode可能正在忙于传输或者MapReduce工作,30秒则是给它10次延误时间。

6.2 数据完整性

在DN本地会对数据进行CRC-32校验生成校验码,在Client想DN申请数据时,会将所需的数据以及生成的校验信息一同发给Client,然后Client在本地在使用CRC-32循环校验生成检验码,查看与DN传输来的是否一致,借此来判断数据是否发生损坏。

6.3 掉线时限参数设置

在DN本地会对数据进行CRC-32校验生成校验码,在Client想DN申请数据时,会将所需的数据以及生成的校验信息一同发给Client,然后Client在本地在使用CRC-32循环校验生成检验码,查看与DN传输来的是否一致,借此来判断数据是否发生损坏。

7.HDFS核心参数配置


下述的设置推荐在最一开始配置集群的时候进行操作,因为涉及到多次集群重启。
详细的设置参数查看PDF06,具体可以设置:

  • 1)节点上NameNode和DataNode进程所占的内存大小
  • 2)设置NameNode上用于处理不同 DataNode 的并发心跳以及客户端并发的元数据操作的线程数量
  • 3)开始回收站功能(网页上手动删除的数据不会放到回收站中,程序中删除的也不会放到回收站,在程序中需要调用moveToTrash()方法才会将数据删除到回收站)
  • 4)集群压力测试(写测试会受限于网络的传输速度,读测试的速度看任务的提交位置和数据的存储位置是否同时存在于同一个节点,如果是只受限于磁盘速度,否的话受限于网络速度)
  • 5)HDFS多目录配置:
  • NameNode多目录配置(他只是把namenode的内容复制了一份,如果namenode所在的节点宕机,集群一样会不能用)
  • DataNode多目录配置,这个多目录每个目录存储的内容都不同,起到了扩展DataNode容量的功能。可以进行磁盘扩展和磁盘均衡操作。
  • 6)HDFS集群的扩容和缩容
  • 设置黑白名单
  • 动态增加新服务器
  • 节点间数据均衡
  • 黑名单退役
  • 7)HDFS存储优化
  • 使用纠删码降低存储数据所用的空间,但是代价是会增加CPU的计算量。
  • 异构存储(冷热数据处理),正在使用的、经常使用的、不经常使用的、永久保存的数据,应该分别存储在不同的介质中。这个设置最好是在一开始就对HDFS的目录做好分类,每个目录中存储不同的类型的数据,而且在配置集群时,也要设置好每个节点那个路径是使用的什么样的存储介质,这种存储介质适合做哪类工作。
  • 8)HDFS故障排除
  • NameNode如果挂掉怎么处理,将SecondaryNameNode的内容复制到NameNode,可以还原一定程度上的数据。
  • 安全模式
  • 慢磁盘监控
  • 小文件归档
  • 9)HDFS集群迁移

三、MapReduce

1.MapReduce概述

1.1 定义、优势与劣势

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
优点:

  • 1)易于编程,用户只关心自己想要的业务逻辑,然后实现框架的接口,就可以完成分布式程序。
  • 2)良好的扩展性,可以动态的增加服务器。
  • 3)高容错性,MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
  • 4)适合海量数据的计算(TB/PB),几千台服务器共同计算。

缺点:

  • 1)不擅长实时计算
  • 2)不擅长流式计算(SparkStreaming和flink适合)
  • 3)不擅长DAG有向无环图计算(并不是不行),也就是串行式迭代式计算任务,前面的任务算完之后,后续的任务才能展开。(spark擅长,因为spark是基于内存的计算速度快,而MapReduce是基于硬盘的计算速度慢。)

1.2 MapReduce核心思想

WordCount案例
在这里插入图片描述

1.3 MapReduce工作流程(面试重点)

在这里插入图片描述

  1. 准备好待处理数据。
  2. 客户端在进行job的submit()前,获取待处理数据的信息,根据参数配置,形成一个任务分配的规划。如切片设置、集群运行设置、以及程序的jar包。
  3. 提交信息,如果是YARN集群模式运行任务的话,我们需要将程序jar包、job.split任务切片设置、以及集群的运行设置由job的提交Client客户端上传到集群中。(如果是本地模式的话,Jar包不用上传。)
  4. 完成提交后,YARN会开启一个总管job的MrAppMaster用于管控整个job的运行情况。首先读取Client上传的配置信息,开启对应数量的MapTask,设置NodeManager的运行参数,来执行任务。
  5. MapTask在启动之后开始使用InputFormat来读取数据,默认是使用TextInputFormat来读取数,其中有两个方法,RecorderReader()和isSplitedable()用于设置数据的读取方式和判断数据是否可以切分。TextInputFormat中的RecorderReader实现的是LineRecorderReader,也就是按行读取信息,返回行信息的偏移量作为K,以及行信息的内容作为V,并将<K, V>对返回给jar包中(任务处理程序中)的Mapper。
    在这里插入图片描述
  6. 在Mapper中对<K, V>数据对完成用户自定义的数据操作之后(计算词频或者计算流量总量等任务),这个Context数据将由outputCollector收集到内存中的环形缓冲区。
  7. 在环形缓冲区中,一半存处理好的<K, V>数据对,另一边存对应数据的索引或者叫描述数据的元数据,元数据中存储着数据的索引index,数据所处分区号 partition = (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks (分区号是什么?答:好像就是字典中的一段,比如说对数字进行排序0到4我设为一个分区,5到9又是一个分区这样。),<K,V>数据对K值的开始位置keystart,<K,V>数据对V值的开始位置valstart。(环形缓存区默认大小为100m,当其中存储的信息占总容量的80%时,数据会反向进行存储,这样已经写入的那80%的内容就可以开始向磁盘中写入。这样这个环形缓冲区就可以一直不停的工作,而不会停下来等待数据向磁盘中写入。)
  8. 在写入到磁盘中的时候之前,会使用快排对分区内的数据进行排序,而且排序时改变的是数据写入磁盘循序(索引,keystart和valstart开始位置),而不是实际的数据存储位置。
  9. 写入到磁盘后,每一个文件都是有多个分区且区内是有序的。不同的分区在后续的处理中会分配给不同的Reducer。
  10. 然后再使用归并排序算法(对已经有序了的数据进行排序使用归并排序会比较快),将多个文件合并成一个。这样最终形成的文件是区内有序,整体无序的。
  11. 有时还可以提前使用Combiner将一些同Key值的数据对合并,起到优化的作用。
    在这里插入图片描述
  12. 在所有或者部分的MapTask任务完成后,启动相应数量的ReduceTask(这个个数一般是和分区的个数相同,但是也看具体的业务需求,比如说,你最后就是要将所有数据聚合成一个,那只能开一个,甚至这个任务只需要map,那可以没有reduce的过程),并告知ReduceTask处理数据的范围(数据分区)。
  13. ReduceTask主动将所有MapTask中自己负责的分区中的数据拉取回来,此时多个统一分区的数据都是区内有序的,所以我们还需要进行一步归并排序,让他们变成整体有序的。
  14. 由于我们的数据已经变成了同一分区且区内有序的了,这样在后面传输到Reduce方法中进行相同K值的数据的数目统计时,就容易判断前后连续的数据是不是具有相同的K值了,因为K值相同的数据肯定连续排在一起。
  15. 这里还可以加上分组排序的内容。
  16. 数据统计完成后我们就可以使用outputFormat中的RecordWriter进行输出了。

1.4 MapReduce进程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责 Map 阶段的整个数据处理流程。
(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。

1.5 MapReduce编程规范


1.6 WordCount案例实操


2.MapReduce序列化

2.1 序列化概述

什么是序列化?

  • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化?

  • 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。
  • 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。所以当我们需要将一个对象中的相关信息以及这个对象和其他对象之间的关系发送到另一台机器上时,我们需要首先对其进行序列化。

为什么不用java自己的Serializable框架,而使用Writable?

  • Java 的序列化是一个重量级序列化框架(Serializable),对于Hadoop来说一个对象被序列化后,会附带太多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。
  • Writable的特点是:序列化后数据体积小,读写速度快,支持多种语言的互操作

2.2自定bean对象实现序列化接口

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步。
(1)必须实现 Writable 接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
super();
}
  • 1
  • 2
  • 3

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为
MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。详见后面排序案例。

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5

2.3 bean对象序列化案例实操


3.核心框架原理(重点)

在这里插入图片描述

  • MapReduce主要分为两个阶段的任务:MapTask和ReduceTask,中间由Shuffle过程连接。
  • 在MapTask阶段主要完成数据的Input工作,而且通过控制InputFormat的类型,我们可以控制输入数据的格式,现在我们遇到的案例的输入格式都是使用的默认格式:一行一行的读入,key表示数据开始字节的偏移量,value表示正式数据。可以发现这个key并没有多大用处。
    然后在Mapper类中进行相关的数据封装处理,方便后续的Shuffle和Reduce操作。
  • 通过Shuffle我们可以完成对输入数据的排序、压缩、分组等操作,然后传给Reducer。
  • 在ReduceTask阶段完成数据的相关业务操作。然后通过OutputFormat控制数据的输出格式,上面的案例我们都是将结果输出到文件中,通过OutputFormat我们可以完成将数据直接输入到MySQL数据库和HBase中的相关操作。

3.1 InputFormat输入数据的处理

3.1.1 切片与MapTask并行度决定机制

Job提交流程源码解析
在这里插入图片描述

  • 1)完成了Job的Map和Reduce阶段的编写,我们就进行job的提交,调用Job.waitForCompletion(true);
  • 2)然后job进行submit(),在cluster中生成两个job运行代理,一个是本地Local模式,一个是Yarn集群模式,程序会对当前的运行模式进行判断决定是Local模式还是Yarn集群模式。
  • 3)然后会准备job运行所需的相关设置信息比如jar包、切片设置、job运行时的集群参数设置,首先会创建一个stage的dir用于存放这些信息并用于后续上传,真集群模式和本地模式创建的路径略有不同。
  • 4)如果是真集群模式,首先会将任务运行的jar包上传到集群中对应的路径,如果是本地模式则不需要jar包直接使用java程序运行。
  • 5)调用FileInputFormat.getSplits()获取切片规划,并将设置写入到stagingDir/jobid中的job.split文件中。
  • 6)job运行时集群的xml设置信息也会写入到这个路径中,然后将第5步中的切片规划与xml设置一起上传到集群。
  • 7)至此,完成job的提交,job进入RUNNING模式。

我们具体在集群上运行任务时,该怎样分配机器(MapTask)的数量,来完成这个job任务呢?肯定与任务的规模相关,大一点的任务我们分配较多的机器来提高任务处理的并行度;小一点的我们分配的机器也就小一点。
那么这个标准是什么呢?该怎么划分呢?

  • 数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
  • 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
    在这里插入图片描述
    切片源码流程:
  1. 程序先找到数据存储的目录。
  2. 开始遍历处理目录下的每一个文件进行规划切片。
  3. 每一个单独数据文件的处理方式首先获取文件大小 length = file.getLen()。
  4. 计算当前任务的切片大小:
    Math.max(minSize, Math.min(maxSize, blockSize));
    maxSize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
    minSize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
splitSize = computeSplitSize(
	blockSize, 
	Math.max(1, ("mapreduce.input.fileinputformat.split.minsize"中的设定值)), 
	(("mapreduce.input.fileinputformat.split.maxsize"), Long.MAX_VALUE); // 如果前面的设定值存在,则取设定值,否则取Long.MAX_VALUE的值

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
	return Math.max(minSize, Math.min(maxSize, blockSize)); // 默认每一个切片的大小等于数据分块的大小
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 开始计算数据的切片规划:0 – splitSize为第一个切片,splitSize – 2 * splitSize为第二个切片……(每次开始切片时,都要判断剩下的部分是否大于splitSize的1.1倍,如果不大于的话划分成一片,不再切成两片。
// 使用 输入的文件数据的大小/切片的大小 如果大于SPLIT_SLOP(数值1.1)才会进行后续切片。
// 这里说明一个split的最大的大小为split设定值的1.1倍。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
	int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
	splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
		blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
	bytesRemaining -= splitSize;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 将切片信息写入切片规划文件。(整个切片的过程在getSplit()方法中完成,InputSplit只记录了切片的元数据信息)
    在这里插入图片描述
  2. 最后将切片信息和后续完成的job其他设置信息提交到本地服务器或者Yarn集群上,服务器上的MRAppMaster就可以根据切片的规划文件开启对应个数的MapTask。
3.1.2 FileInputFormat

我们在编写MapReduce程序时,输入文件的格式有很多:比如基于行的日志文件、二进制格式文件、数据库的表等。我们上述的案例都是使用的默认的TextInputFormat格式来按行获取输入数据,它并不能满足所有的数据输入情况。除了这个数据的输入格式之外,我们还有KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等格式。

TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。

  • 以下是一个示例,比如,一个分片包含了如下 4 条文本记录。
    Rich learning form
    Intelligent learning engine
    Learning more convenient
    From the real demand for more close to the enterprise

  • 每条记录表示为以下键/值对:
    (0,Rich learning form)
    (20,Intelligent learning engine)
    (49,Learning more convenient)
    (74,From the real demand for more close to the enterprise)

3.1.3 CombineTextInputFormat切片机制(解决小文件问题方法之一)

默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
在这里插入图片描述

3.2 Shuffle(面试重点)

Shuffle指的是Map方法之后,Reduce方法之前数据的处理过程。
Shuffle过程的图示:
在这里插入图片描述

  1. 将Map阶段处理好的数据,先写入环形缓存区,然后当环形缓冲区的数据量达到80%时,开始反向进行存储,同时对这80%的数据使用快排进行排序。排序时,使用的算法是快排,对<K, V>数据元数据的索引进行排序,依据是字典。排完序后,将数据按照排好的顺序写入到磁盘上。
  2. 排完序后,进行溢写到磁盘。此时单个溢写文件是全局有序的,多个溢写文件放到一起就只是区内有序了。在磁盘中对同一个MapTask的多个溢写结果文件使用归并排序进行排序,形成一个溢写结果文件。
  3. 然后可以对此时的数据进行Combiner操作,将具有相同K值的<K, V>键值对组合到一起,例如<a, 1>, <a, 1>组合变成<a, 2>。还可以进行压缩让Reducer在拉取数据时效率能够高一点。
  4. Reduce阶段将所有MapTask生成的同一个分区的数据拉取过来,对数据进行归并排序,然后按照相同的Key值进行分组,送入Reduce方法进行统计。
3.2.1 Partition分区划分

默认的Partition分区的划分方法是根据数据对中key的hashCode值对ReduceTasks个数取模获得的,用户不能控制哪个key值划分到某个分区。
但是我们可以重写Partitioner方法,来实现想要的分区划分方式。

ReduceTask的数量和getPartition的数量设置关系:

  • 1)分区号必须从零开始设置,逐一累加。 理论上好像分区号好像可以随便划分,只要其数目能对上你最终的想要结果就行,但反映到具体的分区序号的数值上必须从0开始逐一增加,因为这个数值是和ReduceTask一一对应的,并不只是数值上的相等。(在源码中有一步会判断这个:partition < 0 || partition >= partitions,如果这个表达式为真则会报异常。而partition代表当前数据所属的分区号,partitions代表了任务会申请几个ReduceTask来进行处理,过多的ReduceTask会白白浪费资源。)

  • 2)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

  • 3)如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;

  • 4)如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
    ReduceTask,最终也就只会产生一个结果文件 part-r-00000;

  • 例如:假设自定义分区数为5,则
    (1)job.setNumReduceTasks(1); // 会正常运行,只不过会产生一个输出文件,失去了分区为5个的效果
    (2)job.setNumReduceTasks(2); // 会报错
    (3)job.setNumReduceTasks(6); // 大于5,程序会正常运行,多余的序号会产生空文件

3.2.2 WritableComparable排序(面试重点)

在Hadoop中Key对象一定是要能够进行排序的,否则就无法使用Hadoop的框架,这是因为在Hadoop的业务处理MapReduce中最后一步Reduce阶段需要将所有Key相同的数据进行聚合,如果数据是无序的每次在聚合相同的Key值时,都要遍历数据才行了。
在这里插入图片描述
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序方法的分类:

  • 1)部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
  • 2)全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
  • 3)辅助排序:(GroupingComparator分组)在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
  • 4)二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
3.2.3 Combiner 合并
  • 1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
  • 2)Combiner组件的父类就是Reducer。Reducer的作用是将集群中所有Map任务的结果进行汇总,Combiner就类似于Reducer的作用,不过它运行在MapTask所在的节点,它只负责将自己所在节点的Map任务的结果进行汇总。
  • 3)Combiner和Reducer的区别在于运行的位置:Combiner是在每一个MapTask所在的节点运行;Reducer是接收全局所有Mapper的输出结果;它们两者进行的操作甚至都可以是一致的,只要最终的结果不出错。
  • 4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
  • 5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出<K, V>应该跟Reducer的输入<K, V>类型要对应起来。
    比如下面这个任务,如果在每个Map任务之后进行Combiner操作而且Combiner操作的内容也和Reducer相同的话,最终的到的结果就出现了错误。
    在这里插入图片描述
    但如果Map阶段的Combiner只进行了加和操作,3 + 5 + 7 = 15,2 + 6 = 8,然后再由Reducer拉取数据进行最后的处理,15 + 8 = 23,23 / 2 = 11.5好吧这样也不行。
    现在知道的使用Combiner组件之后,唯一不会出错的的业务需求就是“求和”。
    3 + 5 + 7 = 15,2 + 6 = 8 => 15 + 8 = 23;
    3 5 7 2 6 => 3 + 5 + 7 + 2 + 6 = 23

3.3 输出数据的处理OutputFormat

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat
接口。下面我们介绍几种常见的OutputFormat实现类。
在这里插入图片描述
默认的输出格式是TextOutputFormat。我们还可以根据自己的需求自定义数据的输出格式,以满足我们的实际需求,像是输出到数据库等数据存储仓库中。

3.3.1 自定义OutputFormat

步骤:
1.自定义一个Output类继承FileOutputFormat。
2.改写RecordWriter中的write()方法使其满足我们的需求。

3.4 MapReduce内核源码解析

3.4.1 MapTask工作机制(P109)

在这里插入图片描述
MapTask的数据处理步骤:

  • 1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
  • 2)Map 阶段:该阶段主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。
  • 3)Collect 收集阶段:在用户编写 map() 函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 进行分区(调用Partitioner,默认是根据 key/value 中的key进行分区),并写入一个环形内存缓冲区中。
  • 4)Spill 阶段:即“溢写”,当环形缓冲区满80%后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次排序,并在必要时对数据进行合并、压缩等操作。
    溢写阶段详情:
    步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是二次排序方式:先按照分区编号Partition 进行排序,然后按照 key 进行排序(所以如果你的数据处理过程中key是自己定义的类,还必须要在这个类里实现WritableComparable接口中的compareTo()方法)。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。
    步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
    步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
  • 5)Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
3.4.2 ReduceTask工作机制

在这里插入图片描述

  • 1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  • 2)Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
  • 3)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
3.4.3 ReduceTask并行度决定机制

回顾:MapTask 并行度由切片个数决定,切片个数由输入文件的个数(可以通过设置输入数据的类型为CombineTextInputFormat来对小文件进行聚合)和切片规则决定(max(1, min(Long.MAX_VALUE, blockSize)),默认为blockSIze)。
思考:ReduceTask 并行度由谁决定?
1 ) 设置 ReduceTask 并行度(个数)
ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:

// 默认值是 1,手动设置为 4
job.setNumReduceTasks(4);
  • 1
  • 2

注意事项:

  • 1)ReduceTask=0,表示没有Reduce阶段,输出文件个数和MapTask个数一致。
  • 2)ReduceTask默认值就是1,所以输出文件个数为一个。
  • 3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
  • 4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
  • 5)具体多少个ReduceTask,需要根据集群性能而定。
  • 6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1,不大于1肯定不执行。
3.4.4 MapTask & ReduceTask 源码解析

3.5 Join应用

在这里插入图片描述
首先我们在map阶段将数据处理成TableBean对象,然后使用Pid作为Key,方便进行分区和排序以及后续的Reduce阶段的数据拉取任务。这样在Reduce阶段具有相同Key值也就是Pid值的数据就会同时被进行处理,然后我们从存储了商品名称的的TableBean对象中取出商品名称,然后赋值给存储了订单信息的TableBean对象,这样就完成了两张表的join操作。

  • 缺点:这种方式中叫做ReduceJoin:合并的操作是在 Reduce 阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。
  • 解决方法:使用MapJoin,这种需要join操作的情况一般都会涉及两张表,一张数据量较多我们称之为大表,一张数据量较少我们称之为小表。我们zaimap阶段先将小表的内容存处在内存中,然后读取大表的内容,并且在map阶段就将join的工作完成。这样就避免了大量数据在reduce阶段既要join又要reduce的问题。

3.6 数据清洗Extract Transform Load

数据清洗ETL,是用来描述将数据从来源端经过抽取Extract、转换Transform、加载Load至目的端的过程。ETL一词较常用在数据仓库,但不仅限于数据仓库。
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce程序(job.setNumReduceTask(0))。

3.7 总结

MapReduce处理流程:
1 ) 输入数据接口:InputFormat。

  • 默认使用的实现类是:TextInputFormat,其功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为 value 返回。
  • CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。

2 ) 逻辑处理接口:Mapper,用户根据业务需求实现其中三个方法:

  • setup() 用于初始化一些信息,比如说在做join时首先记录下小表的信息
  • map() 用户自己编写的数据处理逻辑
  • cleanup()释放资源的操作

3 )Partitioner 分区

  • 有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces(默认为1)来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
  • 如果业务上有特别的需求,可以自定义分区,自定义分区后需要将job的numReducerTask设置为相应的数目。

4 )Comparable 排序

  • 当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接 口,重写其中的compareTo()方法。
  • 部分排序:对最终输出的每一个文件进行内部排序。如果只经过map阶段,则是每一个切片文件内的多个分区内部局部有序,每个文件全局是无序的;经过reduce阶段后,数据变成一个个按照分区分别存储的文件,且文件内有序。
  • 全排序:对所有数据进行排序,通常只有一个 Reduce。这样分区就没有意义了,最终还是只有一个输出文件,且内部全局有序。
  • 二次排序:排序的条件有两个。先按照某个指标排序,当这个指标相同时再按照第二个指标排序。

5 )Combiner 合并

  • Combiner 合并可以提高程序执行效率,提前预聚合数据,减少 IO 传输,可以一定程度上解决数据倾斜的问题。
  • 但是使用时必须不能影响原有的业务处理结果。比如说我们的业务逻辑是再求总体数据的和这样预聚合没有问题,如果是求平均值就不可以。

6 ) 逻辑处理接口:Reducer用户根据业务需求实现其中三个方法:

  • setup()
  • reduce()
  • cleanup ()

7 ) 输出数据接口:OutputFormat

  • 默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件输出一行。
  • 用户还可以自定义 OutputFormat。

4.压缩

压缩的优点:减少磁盘IO(数据量少,你传输数据的时候也就读取次数少)、减少占用的磁盘空间 ?(不理解,为什么?存储的时候确实是省空间了,但是用的时候不还得解压吗?)
压缩的缺点:会增加CPU的开销(多了一个加压和解压的过程,都需要使用CPU进行操作

4.1 压缩算法

压缩算法Hadoop是否自带算法文件扩展名是否可以切片进行压缩后,原始程序是否需要进行修改优点缺点
DEFLATE是,直接使用DEFLATE.deflate和文本处理一样,不需要修改--
Gzip是,直接使用DEFLATE.gz和文本处理一样,不需要修改压缩率较高不支持切片;压缩和解压缩速度一般
bzip2是,直接使用bzip2.bz2和文本处理一样,不需要修改压缩率高,支持切片压缩和解压缩速度慢
LZO否,需要安装并编译LZO.lzo需要建立索引,还需要指定输入格式压缩和解压缩速度快;支持切片压缩率一般;想要切片的话需要额外创建索引
Snappy是,直接使用Snappy.snappy和文本处理一样,不需要修改压缩和解压缩速度极快不支持切片;压缩率一般
压缩性能对比:
压缩算法原始文件大小压缩后文件大小压缩速度解压速度
gzip8.3g1.8g17.5MB/s58MB/s
bzp28.3g1.1g2.4MB/s9.5MB/s
LZO8.3g2.9g49.3MB/s74.6MB/s

4.2 压缩算法的使用

4.2.1 压缩位置的选择

在这里插入图片描述

  • 输入到map前主要根据输入数据的大小来考虑:如果输入的数据不大于切片的大小,那么我么只需要采用压缩率高,而且压缩和解压缩速度快的算法(Snappy);如果输入的数据大于切片的大小,我们需要考虑采取能够进行切片的算法(LZO)。
  • 数据输入到reduce时,因为不需要考虑切片的问题只需要减少MapTask和ReduceTask之间的网络IO,所以我们重点考虑压缩和解压缩速度快的(Snappy)。
  • 数据从reduce中输出时,我么需要考虑需求的问题,如果数据需要长期保存,我么就选择压缩率较高的算法(Bzip2或者Gzip);但是如果当前的业务逻辑还没走完,还需要将这次reduce的结果再次输入到下一个map中时,考虑的点就和上述第一条相同了。

4.3 生产环境怎么用


查看案例

5.常见问题与解决方案


查看PDF

  • 1)MapReduce生产经验
  • 数据倾斜:首先检查是否空值过多造成的数据倾斜。生产环境,可以直接过滤掉空值;如果想保留空值,就自定义分区,将空值加随机数打散,最后再二次聚合;能在 map 阶段提前处理,最好先在 Map 阶段处理。如:Combiner、 、MapJoin。设置多个 reduceTask

四、Yarn资源调度器

1.理论

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序。

1.1 Yarn基础架构

YARN 主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。
在这里插入图片描述

1.2 Yarn的工作机制(面试重点)

在这里插入图片描述

  • 1)一个Mr任务提交到客户端所在的节点,开始尝试运行。当在程序中运行到job.waitForCompletion()时,就开始与YarnRunner进行交互。
  • 2)首先,客户端向集群的ResourceManager申请一个Application(也就是说明自己有一个任务想要运行)准备完成任务运行的前置工作,然后ResourceManager会返回给一个Application的资源提交路径,供客户端将任务运行所需的相关数据上传。
  • 3)客户端会将运行Application所需的文件和数据上传到指定的路径,其中上传的文件就是在执行job.submit()后生成的job.split(这项任务的数据切片信息)、job.xml(这项任务的运行参数设置)以及wc.jar(这项任务的运行程序逻辑)。
  • 4)资源提交完毕后,客户端会再次向ResourceManager发出请求,申请一个MrAppMaster(这个是当前整个任务运行的管理者,也就是想要运行的Application的Master)。
  • 5)ResourceManager在接受到用户申请MrAppMaster的请求后,会将这个请求初始化成一个Task,并且放到自己的调度队列中,等待被调度。
  • 6)等到ResourceManager调度到这个Task后,而且正好有空闲的NodeManager资源,
  • 7)那么便会在这个NodeManager上创建一个Container容器,在这个容器中运行的便是MrAppMaster,用来监控当前任务的运行情况。
  • 8)首先MrAppMaster会读取第3)步中上传的数据,
  • 9)然后根据其中的任务的切片信息向ResourceManager申请相应数量的MapTask。当然这个请求在ResourceManager那里一样会被放到调度队列中等待调度。
  • 10)当资源充足时,拥有相应资源的NodeManager就会被分配任务,开启与MapTask数量相同的Container来准备运行实际的job任务。(MapTask的数目一定是和Container的数目相对应的,与NodeManager没有直接关系,一个NodeManager如果资源足够的话,其上可能同时存在多个Container在运行这相同或者不同job的MapTask,各个Container之间以及它们中的MapTask之间没有相互影响。)创建好容器后,首先将job任务运行所需的job.xml文件和job运行的jar包文件拉取过来,做好运行准备。
  • 11)接下来由MrAppMaster来发送启动脚本,正式开始job任务的MapTask运行。这时就会在Container中开启YarnChild进程,运行代码。等到MapTask程序运行结束,生成了Map阶段的结果文件后,会通知MrAppMaster来进行后续的任务处理。
  • 12)MrAppMaster在收到全部或者部分的MapTask任务结束的信息后,会再次向ResourceManager申请资源用于进行后续的Reduce操作(具体申请的ReduceTask数目和任务的设置和分区的数目有关,当然如果没有Reduce阶段,就直接转到步骤14)),同样这个申请也会被放到调度队列中等待调度。当被成功调度到时,和MapTask阶段一样首先会申请对应数量的Container,然后再在Container中创建ReduceTask程序,其对应的进程同样是YarnChild。
  • 13)ReduceTask拉取对应的MapTask的数据并进行处理。
  • 14)当ReduceTask也执行完毕后,MrAppMaster会向ResourceManager注销自己,释放自己申请的资源。

1.3 MapReduce/HDFS/Yarn之间的合作


这里可以画一个思维导图,将三者的合作关系串联起来

1.4 调度器和调度算法(面试重点)

在这里插入图片描述
Hadoop中Yarn的作业调度器算法有三种:FIFO先来先服务、容量Capacity Scheduler和公平Fair Scheduler算法。ApacheHadoop默认的资源调度器是Capacity Scheduler。CDH的Hadoop默认是Fair Scheduler算法。
具体设置详见:yarn-default.xml 文件

<property>
	<description>The class to use as the resource scheduler.</description>
	<name>yarn.resourcemanager.scheduler.class</name>
	<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
1.4.1 先进先出FIFO

在这里插入图片描述

1.4.2 容量调度器Capacity Scheduler

Capacity Scheduler 是 Yahoo 开发的多用户调度器。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.4.3 公平调度器Fair Scheduler

Fair Schedulere 是 Facebook 开发的多用户调度器。
在这里插入图片描述
什么是缺额?某时刻一个作业应该获取的资源和实际获取资源的差距叫“缺额”。
在这里插入图片描述
公平调度器中队列资源的分配方式:

  • 1)FIFO策略:公平调度器每个队列资源分配策略如果选择的是FIFO的话,此时公平调度器相当于上面讲过的容量调度器。
  • 2)Fair策略:一种基于最大最小公平算法实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。这意味着,如果一个队列中有两个应用程序同时运行,则每个应用程序可得到此队列1/2的资源;如果此时再来一个应用,则从原有的两个应用中分出一部分,使得每个应用程序可得到队列1/3的资源。在这里插入图片描述
  • 3 )DRF 策略
    DRF(Dominant Resource Fairness),我们之前说的资源,都是单一标准,例如只考虑内存(也是Yarn默认的情况)。但是很多时候我们资源有很多种,例如内存,CPU,网络带宽等,这样我们很难衡量两个应用应该分配的资源比例。
    那么在YARN中,我们用DRF来决定如何调度:
    假设集群一共有100 CPU和10T 内存,而应用A需要(2 CPU, 300GB),应用B需要(6 CPU,100GB)。则两个应用分别需要A(2%CPU, 3%内存)和B(6%CPU, 1%内存)的资源,这就意味着A是内存主导的, B是CPU主导的,针对这种情况,我们可以选择DRF策略对不同应用进行不同资源(CPU和内存)的一个不同比例的限制。

1.5 命令行操作Yarn

1.5.1 yarn application 查看任务
  • 列出所有 Application:
yarn application -list
  • 1
  • 根据 Application 状态过滤:yarn application -list -appStates + 所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
yarn application -list -appStates FINISHED
  • 1
  • Kill 掉 Application:
yarn application -kill application_1612577921195_0001
  • 1
1.5.2 yarn logs 查看日志
  • 查询 Application 日志:yarn logs -applicationId
yarn logs -applicationId application_1612577921195_0001
  • 1
  • 查询 Container 日志: yarn logs -applicationId -containerId
yarn  logs  -applicationId application_1612577921195_0001  -containerId container_1612577921195_0001_01_000001
  • 1
1.5.3 yarn applicationattempt 查看尝试运行的任务
  • 列出所有 Application 尝试的列表:yarn applicationattempt -list
yarn  applicationattempt  -list application_1612577921195_0001
  • 1
  • 打印 ApplicationAttemp 状态:yarn applicationattempt -status
yarn  applicationattempt  -status appattempt_1612577921195_0001_000001
  • 1
1.5.4 yarn container 查看容器
  • 列出所有 Container:yarn container -list
yarn  container  -list appattempt_1612577921195_0001_000001
  • 1
  • 打印 Container 状态: yarn container -status
yarn  container  -status container_1612577921195_0001_01_000001
  • 1

注:只有在任务运行的期间才能看到 container 的状态,因为一旦任务运行结束container容器就会被释放。

1.5.5 yarn node 查看节点状态
  • 列出所有节点:yarn node -list -all
yarn node -list -all
  • 1
1.5.6 yarn rmadmin 更新配置
  • 重新加载队列配置:yarn rmadmin -refreshQueues,如果我对队列的调度机制进行了更改,可以通过执行这条命令来更新配置。
yarn rmadmin -refreshQueues
  • 1
1.5.7 yarn queue 查看队列
  • 打印队列信息:yarn queue -status
yarn queue -status default
  • 1

1.6 Yarn在生产环境下的配置参数

在这里插入图片描述

  • 1)ResourceManager这个是针对于整个集群的管理者来进行设置的,其中的请求的线程数量,这个表示我同时有多少个线程在等待着为可能送来的任务进行集群资源的分配,这个要设置要根据实际任务的运行来具体配置,后面会结合案例进行讲解。
  • 2)NodeManager这个是针对于每一台单独的机器来进行设置的,比如说其中第一个“是否让Yarn自己检测硬件进行配置”就是让Yarn系统自行进行此节点的相关配置。第二个和第三个设置“是否将虚拟核数当做CPU核数”、“虚拟核数和物理核数的乘数”,在同一时刻的集群中有的CPU性能较强,有的可能较弱,有的节点的CPU性能一核顶的上较弱CPU的多核,而这两个数值的设置就是为了平衡不同节点的CPU性能的差异。
    (根据这个NodeManager的相关设置来看,NodeManager自己也是运行在节点上的一个“容器程序”,而具体运行Task的Container试运行在这个NodeManager容器之中的。有关NodeManager的相关设置一是为了能够使NodeManager中的Container能够充分利用当前节点的资源,另一方面是为了不让NodeManager占用当前节点运行的Linux系统的资源,一方Linux系统崩溃。所以NodeManager的相关设置最大值要小于当前节点的实际值,为Linux系统的运行留出资源。)
  • 3)Container相关的设置,一是为了能够充分让Task更好的运行,充分利用NodeManager的资源;同时又是为了不能超过NodeManager的自身所占用的资源,否自同样会占用当前节点上运行的Linux系统的资源,可能会引起系统的崩溃。所以Container的相关设置最大值同样要小于NodeManager的相关设置值。

2.Yarn怎么使用

2.1 生产环境下的参数配置


具体情况具体分析,样例配置见PDF

2.2 容量调度器的配置

在默认情况下,Hadoop的ResourceManager调度器中只有一个Default队列,实际上有点像FIFO调度,不能够满足实际的生产要求。
在真实的生产环境中,一般是按照业务逻辑模块来创建多个队列,像是登录注册队列,购物车队列,下单,业务部门1,业务部门2等。
创建多个队列的好处是:

  • 1)不用担心员工不小心,写出错误的代码将集群资源一次性全部消耗
  • 2)实现业务的降级,在特殊时期能够直接将负责某个业务逻辑的队列资源全部拿出来先供比较重要的任务使用,比如说双11时,应该没多少人在注册,所以负责登录注册的队列中的资源就可以先拿出来供较为重要的业务逻辑来使用。
2.2.1 配置多队列容量调度器

见PDF

2.2.2 设置任务优先级

容量调度器,支持任务优先级的配置,在资源紧张时,优先级高的任务将优先获取资源。默认情况,Yarn 将所有任务的优先级限制为 0,若想使用任务的优先级功能,须开放该限制。

2.3 公平调度器的配置

2.3.1 需求

创建两个队列,分别是 test 和 atguigu(以用户所属组命名)。期望实现以下效果:若用户提交任务时指定队列,则任务提交到指定队列运行;若未指定队列,test 用户提交的任务到 root.group.test 队列运行,atguigu 提交的任务到 root.group.atguigu 队列运行(注:group 为用户所属组)。

公平调度器的配置涉及到两个文件,一个是 yarn-site.xml,另一个是公平调度器队列分配文件 fair-scheduler.xml(文件名可自定义)。

2.4 Yarn的tool接口


在使用tool接口后,我们就能够使用一个jar来执行多个hadoop样例,方法和操作与hadoop中提供的example相同,还可以用来指定运行任务的队列。详情见yarndemo

五、Hadoop综合调优与源码解析


1.综合调优

  • 1)小文件优化
  • 从源头上处理,将小文件合并成大文件
  • HDFS存储方面,将多个小文件打包成HAR文件
  • CombineTextInputFormat将多个小文件在切片过程中生成一个切片,而不是每个小文件对应一个MapTask
  • 开启uber模式,实现JVM重用

实际工作中的参数调优按照PDF06文档中10.3的流程走一遍。

2.源码解析

2.1 RPC通信原理解析

171

2.2 NameNode启动源码解析

2.3 DataNode启动源码解析

2.4 HDFS上传源码解析

2.5 Yarn源码解析

2.6 MapReduce源码解析

2.7 Hadoop源码编译

六、NameNode的HA高可用

1.HA概述

1)所谓HA(High Available),即高可用(7*24小时不中断服务)。
2)实现高可用最关键的策略是消除单点故障。HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。也就是说,HA的工作机制是配置两个NameNode,通过双NameNode消除单点故障。
3)Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)。
4)NameNode主要在以下两个方面影响HDFS集群
NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启
NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用
HDFS HA功能通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器。

在启动namenode之前,需要启动hadoop2.x中新引入的(QJM)Quorum Journal Manager,QJM主要用来管理namenode之间的数据同步,当active namenode数据更新时会传递给QJM,QJM在所有的namenode之间同步,最后QJM将active namenode 更新的数据同步到了standby namenode中。
启动多个namenode时,并配置namenode的主机地址,还要配置隔离机制,因为容易出现SB(split-brain)状况,所谓的sb状况意思就是当多个namenode正常状态时,一台active,多台standby。如果某段时间因为网络等非namenode自身关系导致namenode间交流阻断了,这样容易出现多台active的设备,容易抢占资源等。
引入zookeeper来对namenode进行监听,因为在一般情况下,active 的namenode崩溃了的话,需要人工切换standby Namenode为active,非常不人性化。通过zookeeper可以监听多个namenode,当active namenode崩溃的话,zookeeper监听到后马上通知zookeeper的leader进行主备选举,在standby namenode中选举出一台,并将它置为active模式替换崩溃的namenode。

2.HA原理

自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController(ZKFC)进程,如下图所示。ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。
在这里插入图片描述
HA的自动故障转移依赖于ZooKeeper的以下功能:

  • 1)故障检测:集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果NameNode崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
  • 2)现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。

ZKFC是Hadoop中HA自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。
每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:

  • 1)健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
  • 2)ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
  • 3)ZooKeeper选择NameNode:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,它将负责运行故障转移进程以使它的本地NameNode为Active。故障转移进程与前面描述的手动故障转移相似,首先必须要杀死之前的NameNode,然后将本地NameNode转换为Active状态。

3.HA配置

  • HA配置分为手动和自动两种,手动的话不需要而外的组件但是局限性很大,配置手动HA必须要求两个NameNode上的namenode进程都是存在的,如果一台NameNode原地爆炸了,其上namenode进程不存在了,而另外一台NameNode为了避免脑裂(split brain,sb)问题,他也无法将自己由standby状态转化成active,因为转换之前他必须和另一台NameNode进行通信,确认另一台NameNode上的namenode进程状态。
  • 自动HA配置需要Zookeeper来做中间件,监控两个NameNode上namenode的状态,这样就可以完成namenode状态的自动切换。NameNode上运行一个ZKFC进程,这个进程用于检测自身namenode的工作状态以及和Zookeeper保持会话连接实时告知Zookeeper自身namenode的工作状态,并在现役namenode出现问题时对自身namenode节点进行状态转换(Active转Standby或者反过来)。

配置过程查看word文件…\尚硅谷大数据技术之HadoopHA\1.笔记\尚硅谷大数据技术之Hadoop(HDFS).docx

面经

  1. ssh无密码登录

1.HDFS数据块相关

HDFS数据块:
与一般文件系统一样,HDFS也有块(block)的概念,HDFS上的文件也被划分为块大小的多个分块作为独立的存储单元。
HDFS中小于一个块大小的文件不会占据整个块的空间(当一个1MB的文件存储在一个128MB的块中时,文件只使用1MB的磁盘空间,而不是128MB)

1.1 设置数据块的好处?

  • 1)一个文件的大小可以大于集群任意节点磁盘的容量
  • 2)容易对数据进行备份,提高容错能力
  • 3)使用抽象块概念而非整个文件作为存储单元,大大简化存储子系统的设计

1.2 HDFS里面为什么一般设置块大小为64MB或128MB?

  • 这个主要和硬盘的性能有关。一般要满足一个关系:寻址时间是传输时间的1%,这种情况下HDFS分布式文件系统的效率是最优的。HDFS文件系统中,寻址时间平均是在10ms左右,现在一般都是用普通的机械硬盘做HDFS存储,这么算一下10ms / 0.01 = 1s,现在机械硬盘的传输速率一般在100MB/s,所以块大小应该设置为100MB左右,取整就是128MB。

为什么不能太小?

  • 1)还有一些其他的原因,存储块的大小会影响读取数据时的硬盘寻道时间,HDFS初衷是为了处理大数据量操作的,如果文件块设置的太小,那么一个数据就会被划分成太多的块,在硬盘上存储的时候可能比较分散,这样会增加硬盘的寻道时间。
  • 2)块的大小小了,同一个文件存储所需的块的数目就会变多,NameNode中需要存储的对应元数据的信息也就变多了,会消耗NameNode内存。每个文件/目录的元数据(包括文件被分成了哪些blocks,每个block存储在哪些服务器的哪个block块上),都是存储在NameNode上的。每一份元数据占用150B,因此300M内存情况下,只能存储不超过300M/150=2M个元数据。

为什么不能太大?

  • 这个和Hadoop的MapReduce任务处理有关
  • 1)Map崩溃问题:如果在进行任务处理过程中,Map任务崩溃了,系统在重启之后需要重新加载数据块,数据块越大,数据加载时间就越长,系统恢复时间也就越长。
  • 2)监管时间问题:每个节点会周期性的与NameNode节点进行汇报通信,说明自己的工作状态。倘若某一个节点保持沉默的时间超过一个预设的时间间隔,NameNode节点会记录这个节点状态为死亡,并将该节点的数据进行重新备份,转发给别的节点。而这个“预设时间间隔”是从数据块大小的角度大致估算的。(对与64MB的数据块,我可以假设你10分钟之内无论如何也能解决完了吧,超过10分钟还没反应,那我就认为你出故障或已经死了。)64MB大小的数据块,其时间尚可较为精准地估计,如果我将数据块大小设为640MB甚至上G,那这个“预设的时间间隔”便不好估算,估长估短对系统都会造成不必要的损失和资源浪费。
  • 3)任务分解问题:数据量的大小与问题解决的复杂度呈一定线性关系。对于同一个算法,处理的数据量越大,时间复杂度就越高,同时也会造成问题处理的并行性的下降。
  • 4)约束Map输出:在MapReduce框架里,Map之后的数据是要经过排序才执行Reduce操作的。这通常涉及到归并排序,而归并排序的算法思想便是“对小文件进行排序,然后将小文件归并成大文件”,因此“小文件”不宜过大。

参考

end

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/80306
推荐阅读
相关标签
  

闽ICP备14008679号