当前位置:   article > 正文

数据开发面试题2020总结_下列关于hadoop生态圈的描述,错误的是()。[3分]aflume是hadoop生态圈中一个高可用

下列关于hadoop生态圈的描述,错误的是()。[3分]aflume是hadoop生态圈中一个高可用

Hadoop面试题

1.1 Hadoop基础(☆☆)

1.1.1 下列哪项通常是集群的最主要瓶颈(C)

A.CPU

B.网络

C.磁盘 IO

D.内存

答案解析: C.磁盘 IO对集群的影响 IO作为传输数据的管道如果管道越大对数据的传输也自然够大,其保证集群数据传输的稳定。

1.1.2 下列哪项可以作为集群的管理?(C)

C.ClouderaManager

D.Zookeeper

1.1.3 下列哪个是Hadoop运行的模式?(ABC)

A.单机版

B.伪分布式

C.完全分布式

1.1.4 列举几个hadoop生态圈的组件并做简要描述

1)Zookeeper:是一个开源的分布式应用程序协调服务,基于zookeeper可以实现同步服务,配置维护,命名服务。

2)Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

3)Hbase:是一个分布式的、面向列的开源数据库, 利用Hadoop HDFS作为其存储系统。

4)Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据档映射为一张数据库表,并提供简单的sql 查询功能,可以将sql语句转换为MapReduce任务进行运行。

5)Sqoop:将一个关系型数据库中的数据导进到Hadoop的 HDFS中,也可以将HDFS的数据导进到关系型数据库中。

1.1.5 解释“hadoop”和“hadoop 生态系统”两个概念。

Hadoop是指Hadoop框架本身;hadoop生态系统,不仅包含hadoop,还包括保证hadoop框架正常高效运行其他框架,比如zookeeper、Flume、Hbase、Hive、Sqoop等辅助框架。

1.1.6 简要描述如何安装配置apache的一个开源Hadoop,只描述即可,无需列出具体步骤,列出具体步骤更好。

1使用root账户登录

2修改IP

3修改host主机名

4配置SSH免密码登录

5关闭防火墙

6安装JDK

7解压hadoop安装包

8配置hadoop的核心文件 hadoop-env.sh,core-site.xml , mapred-site.xml , hdfs-site.xml

9配置hadoop环境变量

10格式化 hadoop namenode-format

11启动节点start-all.sh

1.1.7 Hadoop中需要哪些配置文件,其作用是什么?

1)core-site.xml:

(1)fs.defaultFS:hdfs://cluster1(域名),这里的值指的是默认的HDFS路径 。

(2)hadoop.tmp.dir:/export/data/hadoop_tmp,这里的路径默认是NameNode、DataNode、secondaryNamenode等存放数据的公共目录。用户也可以自己单独指定这三类节点的目录。

(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,这里是ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 。

2)hadoop-env.sh: 只需设置jdk的安装路径,如:export JAVA_HOME=/usr/local/jdk。

3)hdfs-site.xml:

(1) dfs.replication:他决定着系统里面的文件块的数据备份个数,默认为3个。

(2) dfs.data.dir:datanode节点存储在文件系统的目录 。

(3) dfs.name.dir:是namenode节点存储hadoop文件系统信息的本地系统路径 。

4)mapred-site.xml:

mapreduce.framework.name: yarn指定mr运行在yarn上

1.1.8 请列出正常工作的Hadoop集群中Hadoop都分别需要启动哪些进程,它们的作用分别是什么?

1)NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。

2)SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。

3)DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。

4)ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。

5)NodeManager(TaskTracker)执行任务

6)DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。

7)JournalNode 高可用情况下存放namenode的editlog文件.

1.1.9 简述Hadoop的几个默认端口及其含义

1)dfs.namenode.http-address:50070

2)SecondaryNameNode辅助名称节点端口号:50090

3)dfs.datanode.address:50010

4)fs.defaultFS:8020 或者9000

5)yarn.resourcemanager.webapp.address:8088

1.1.10 请列出正常工作的hadoop集群中hadoop都分别需要启动哪些进程,他们的作用分别是什么,尽可能写的全面些?

 

1)NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。

2)SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。

3)DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。

4)ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。

5)NodeManager(TaskTracker)执行任务

6)DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。

7)JournalNode 高可用情况下存放namenode的editlog文件.

1.1.11 列出几个优化hadoop,怎么做数据平衡?列出步骤

 

(1) 从应用程序角度进行优化。由于mapreduce是迭代逐行解析数据文件的,怎样在迭代的情况下,编写高效率的应用程序,是一种优化思路。

(2) 对Hadoop参数进行调优。当前hadoop系统有190多个配置参数,怎样调整这些参数,使hadoop作业运行尽可能的快,也是一种优化思路。

(3) 从系统实现角度进行优化。这种优化难度是最大的,它是从hadoop实现机制角度,发现当前Hadoop设计和实现上的缺点,然后进行源码级地修改。该方法虽难度大,但往往效果明显。

     数据平衡:在Hadoop中,包含一个Balancer程序,通过运行这个程序,可以使得HDFS集群达到一个平衡的状态,使用这个程序的命令如下:

sh $HADOOP_HOME/bin/start-balancer.sh –t 10%

这个命令中-t参数后面跟的是HDFS达到平衡状态的磁盘使用率偏差值。如果机器与机器之间磁盘使用率偏差小于10%,那么我们就认为HDFS集群已经达到了平衡的状态。

1.1.12 Hadoop架构图

1.1.13Hadoop中RecordReader的作用是什么?

由于Hadoop将数据拆分为各种块,因此使用RecordReader将拆分数据读取到单个记录中。

1.1.14 Hadoop中job和Tasks之间的区别是什么?

JobTracker 是一个 master 服务,软件启动之后 JobTracker 接收 Job,负责调度 Job的每一个子任务, task 运行于 TaskTracker 上,并监控它们,如果发现有失败的 task 就重新运行它。一般情况应该把 JobTracker 部署在单独的机器上。

TaskTracker 是运行在多个节点上的 slaver 服务。TaskTracker 主动与 JobTracker 通信,接收作业,并负责直接执行每一个任务。
 

1.2 HDFS(☆☆☆)

1.2.1 HDFS 中的 block 默认保存几份?(A)

A.3 份

B.2 份

C.1 份

D.不确定

1.2.2 HDFS 默认 BlockSize 是(C

A.32MB

B.64MB(2.7.2版本,本地模式)

C.128MB(2.7.2版本,分布式模式)

1.2.3 Client 端上传文件的时候下列哪项正确?(BC)

A.数据经过NameNode传递DataNode

B.Client端将文件切分为Block,依次上传

C.Client只上传数据到一台DataNode,然后由NameNode负责Block复制工作

1.2.4 下面哪个程序负责 HDFS 数据存储?(C)

A.NameNode

B.JobTracker

C.DataNode

D.SecondaryNameNode

E.tasktracker

1.2.5 关于 SecondaryNameNode 哪项是正确的?(C)

A.它是NameNode的热备

B.它对内存没有要求

C.他的目的使帮助NameNode合并编辑日志,减少NameNode 启动时间

D. SecondaryNameNode应与NameNode 部署到一个节点

1.2.6 下列哪个程序通常与 NameNode 在一个节点启动?(D)

A.SecondaryNameNode

B.DataNode

C.TaskTracker

D.JobTracker

hadoop的集群是基于master/slave模式,namenode和jobtracker属于master,datanode和tasktracker属于slave,master只有一个,而slave有多个。

SecondaryNameNode内存需求和NameNode在一个数量级上,所以通常secondary NameNode(运行在单独的物理机器上)和 NameNode 运行在不同的机器上。

JobTracker对应于NameNode,TaskTracker对应于DataNode。

DataNode和NameNode是针对数据存放来而言的。JobTracker和TaskTracker是对于MapReduce执行而言的。

 

mapreduce中几个主要概念,mapreduce 整体上可以分为这么几条执行线索:

jobclient,JobTracker与TaskTracker。

1)JobClient会在用户端通过JobClient类将已经配置参数打包成jar文件的应用存储到hdfs,并把路径提交到Jobtracker,然后由JobTracker创建每一个Task(即 MapTask 和 ReduceTask)并将它们分发到各个TaskTracker服务中去执行。

2)JobTracker是一master服务,软件启动之后JobTracker接收Job,负责调度Job的每一个子任务。task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker 部署在单独的机器上。

3)TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。TaskTracker 都需要运行在HDFS的DataNode上。

 

1.2.7 文件大小默认为 64M,改为 128M 有啥影响?

增加文件块大小,需要增加磁盘的传输速率。

 

1.2.8 HDFS的存储机制(☆☆☆☆☆)

HDFS存储机制,包括HDFS的写入过程和读取过程两个部分

 

1)客户端namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。

2)namenode返回是否可以上传。

3)客户端请求第一个 block上传到哪几个datanode服务器上。

4)namenode返回3个datanode节点,分别为dn1dn2dn3

5)客户端请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成

6)dn1dn2dn3逐级应答客户端

7)客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2dn2传给dn3dn1每传一个packet会放入一个应答队列等待应答

8)当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)

 

1)客户端向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。

2)挑选一台datanode(就近原则,然后随机)服务器,请求读取数据

3)datanode开始传输数据给客户端(从磁盘里面读取数据放入流,以packet为单位来做校验)。

4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。

1.2.9 secondary namenode工作机制(☆☆☆☆☆)

1)第一阶段:namenode启动

(1)第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求

(3)namenode记录操作日志,更新滚动日志。

(4)namenode在内存中对数据进行增删改查

2)第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果。

(2)Secondary NameNode请求执行checkpoint。

(3)namenode滚动正在写的edits日志

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint

(7)拷贝fsimage.chkpoint到namenode

(8)namenode将fsimage.chkpoint重新命名成fsimage

1.2.10 NameNode与SecondaryNameNode 的区别与联系?(☆☆☆☆☆)

1)机制流程同上;

2)区别

(1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。

(2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。

3)联系:

(1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。

(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。

1.2.11 hadoop节点动态上线下线怎么操作?

1)节点上线操作:

当要新上线数据节点的时候,需要把数据节点的名字追加在 dfs.hosts 文件中

(1)关闭新增节点的防火墙

(2)在 NameNode 节点的 hosts 文件中加入新增数据节点的 hostname

(3)在每个新增数据节点的 hosts 文件中加入 NameNode 的 hostname

(4)在 NameNode 节点上增加新增节点的 SSH 免密码登录的操作

(5)在 NameNode 节点上的 dfs.hosts 中追加上新增节点的 hostname,

(6)在其他节点上执行刷新操作:hdfs dfsadmin -refreshNodes

(7)在 NameNode 节点上,更改 slaves 文件,将要上线的数据节点 hostname 追加

到 slaves 文件中

(8)启动 DataNode 节点

(9)查看 NameNode 的监控页面看是否有新增加的节点

2)节点下线操作:

(1)修改/conf/hdfs-site.xml 文件

(2)确定需要下线的机器,dfs.osts.exclude 文件中配置好需要下架的机器,这个是阻

止下架的机器去连接 NameNode。

(3)配置完成之后进行配置的刷新操作./bin/hadoop dfsadmin -refreshNodes,这个操作的作用是在后台进行 block 块的移动。

(4)当执行三的命令完成之后,需要下架的机器就可以关闭了,可以查看现在集群上连接的节点,正在执行 Decommission,会显示:Decommission Status : Decommission in progress 执行完毕后,会显示:Decommission Status : Decommissioned

(5)机器下线完毕,将他们从excludes 文件中移除。

1.2.12 hdfs整体架构介绍

 

 

1.2.13 namenode内存包含哪些,具体如何分配

NameNode整个内存结构大致可以分成四大部分:Namespace、BlocksMap、NetworkTopology及其它,图2为各数据结构内存逻辑分布图示。

图2 NameNode内存全景图

1)Namespace:维护整个文件系统的目录树结构及目录树上的状态变化;

2)BlockManager:维护整个文件系统中与数据块相关的信息及数据块的状态变化;

3)NetworkTopology:维护机架拓扑及DataNode信息,机架感知的基础;

4)其它:

LeaseManager:读写的互斥同步就是靠Lease实现,支持HDFS的Write-Once-Read-Many的核心数据结构;

CacheManager:Hadoop 2.3.0引入的集中式缓存新特性,支持集中式缓存的管理,实现memory-locality提升读性能;

SnapshotManager:Hadoop 2.1.0引入的Snapshot新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;

DelegationTokenSecretManager:管理HDFS的安全访问;

另外还有临时数据信息、统计信息metrics等等。

NameNode常驻内存主要被Namespace和BlockManager使用,二者使用占比分别接近50%。其它部分内存开销较小且相对固定,与Namespace和BlockManager相比基本可以忽略。

1.2.14 HAnamenode 是如何工作的? (☆☆☆☆☆)

 

ZKFailoverController主要职责

1)健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。

2)会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。

3)当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。

4)master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态

1.2.15 namenode ha高可用源码实现方式

 

  1)HealthMonitor初始化完成后通过内部线程调用NameNode的RPC接口对其进行健康检查

  2)如果检查到NameNode状态异常,会回调ZKFailoverContorller注册的回调函数进行相应的处理

  3)如果ZKFailoverController发现集群需要进行主备选举,会使用ActiveStanbyElector和zookeeper集群通信完成主备切换

  4)ActiveStanbyElector在完成主备切换后,回调ZKFailoverController注册的方法使NameNode变成active或者stanby状态

1.2.16 hadoop2.x Federation 

Active NN的架构使得HDFS在集群展性和性能上都有潜在的问题,当集群大到一定程度后,NN进程使用的内存可能会达到上百GNN了性能的瓶

常用的估算公式1G对应1百万个,按缺省大小算的,大概是64T (这个估算比例是有比较大的富裕的,其实,即使是每个文件只有一个块,所有元数据信息也不会有1KB/block)为了解决这个问题,Hadoop 2.x提供了HDFS Federation, 示意如下:

 

多个NN共用一个集群里的存储资源,每个NN都可以外提供服每个NN都会定一个存池,有独的id,每个DN为所有存储池提供存储

DN会按照存id向其对应NN汇报块信息,同时,DN会向所有NN汇报本地存储可用资源情

如果需要在客端方便的访问若干个NN上的源,可以使用客端挂表,把不同的目映射到不同的NN,但NN上必存在相的目

1.2.17 HDFS Federation的原理结构

HDFS Federation意味着在集群中将会有多个namenode/namespace,这样的方式有什么好处呢?

多namespace的方式可以直接减轻单一NameNode的压力。

一个典型的例子就是上面提到的NameNode内存过高问题,我们完全可以将上面部分大的文件目录移到另外一个NameNode上做管理.更重要的一点在于,这些NameNode是共享集群中所有的DataNode的,它们还是在同一个集群内的。HDFS Federation原理结构图如下:

 

我们可以拿这种图与上一小节的图做对比,我们可以得出这样一个结论:

HDFS Federation是解决NameNode单点问题的水平横向扩展方案。

这时候在DataNode上就不仅仅存储一个Block Pool下的数据了,而是多个(大家可以在DataNode的datadir所在目录里面查看BP-xx.xx.xx.xx打头的目录)。

在HDFS Federation的情况下,只有元数据的管理与存放被分隔开了,但真实数据的存储还是共用的,这与viewFs还是不一样的。之前看别的文章在讲述HDFS Federation的时候直接拿viewFs来讲,个人觉得二者还是有些许的不同的,用一句话概况应该这么说。

HDFS的viewFs是namespace完全独立(私人化)的Federation方案,可以这么说,viewFs是Federation的一个简单实现方案。

因为它们不仅仅是namespace独立,而且真实数据的存放也是独立的,也就是多个完全独立的集群。在这点上我们还是有必要做一下区分,否则让人以为HDFS Federation就是viewFs。

1.2.18 HDFS Federation方案的优势

第一点,命名空间的扩展。因为随着集群使用时间的加长,HDFS上存放的数据也将会越来越多。这个时候如果还是将所有的数据都往一个NameNode上存放,这个文件系统会显得非常的庞大。这时候我们可以进行横向扩展,把一些大的目录分离出去.使得每个NameNode下的数据看起来更加的精简。

第二点,性能的提升.这个也很好理解。当NameNode所持有的数据量达到了一个非常大规模的量级的时候(比如超过1亿个文件),这个时候NameNode的处理效率可能就会有影响,它可能比较容易的会陷入一个繁忙的状态。而整个集群将会受限于一个单点NameNode的处理效率,从而影响集群整体的吞吐量。这个时候多NameNode机制显然可以减轻很多这部分的压力。

第三点,资源的隔离。这一点考虑的就比较深了。通过多个命名空间,我们可以将关键数据文件目录移到不同的NameNode上,以此不让这些关键数据的读写操作受到其他普通文件读写操作的影响。也就是说这些NameNode将会只处理特定的关键的任务所发来的请求,而屏蔽了其他普通任务的文件读写请求,以此做到了资源的隔离。千万不要小看这一点,当你发现NameNode正在处理某个不良任务的大规模的请求操作导致响应速度极慢时,你一定会非常的懊恼。

1.2.19 hadoop的块大小,从哪个版本开始是128M

Hadoop1.x都是64M,hadoop2.x开始都是128M

1.2.20 写出你常用的hdfs命令

Hdfs dfs -ls /

Hdfs dfs -cat /xxx

1.2.21 hdfs原理,以及各个模块的职责

 

HDFS的架构图之基础架构

 

 

  1. NameNode是一个中心服务器,单一节点(简化系统的设计和实现),负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  2. 文件操作,namenode是负责文件元数据的操作,datanode负责处理文件内容的读写,跟文件内容相关的数据流不经过Namenode,只询问它跟哪个dataNode联系,否则NameNode会成为系统的瓶颈
  3. 副本存放在哪些Datanode上由NameNode来控制,根据全局情况作出块放置决定,读取文件NameNode尽量让用户先读取最近的副本,降低读取网络开销和读取延时
  4. NameNode全权管理数据的复制,它周期性的从集群中的每个DataNode接收心跳信息和状态报告,接收到心跳信号意味着DataNode节点工作正常,块状态报告包含了一个该DataNode所有的数据列表

NameNode与Datanode的总结概述

 

 

5hdfs的架构之文件的文件副本机制以及block块存储

 

 

所有的文件都是以block块的方式存放在HDFS文件系统当中,在hadoop1当中,文件的block块默认大小是64M,hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定

    <property>

        <name>dfs.block.size</name>

        <value>块大小 以字节为单位</value>//只写数值就可以

    </property>

1.3 MapReduce(☆☆☆☆☆)

1.3.1 谈谈Hadoop序列化和反序列化自定义bean对象实现序列化?

1)序列化和反序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。 

反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。

2)自定义bean对象要想序列化传输步骤及注意事项:。

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

(3)重写序列化方法

(4)重写反序列化方法

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用

(7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

1.3.2 在Hadoop中定义的InputFormat中,默认是哪一个? A

A.TextInputFormat B.KeyValueInputFormat C.SequenceFileInputFormat

1.3.3 两个类TextInputFormat和KeyValueInputFormat的区别是什么?

1)相同点:

TextInputformat和KeyValueTextInputFormat都继承了FileInputFormat类,都是每一行作为一个记录;

2)区别:

TextInputformat将每一行在文件中的起始偏移量作为 key,每一行的内容作为value。默认以\n或回车键作为一行记录。

KeyValueTextInputFormat 适合处理输入数据的每一行是两列,并用 tab 分离的形式。

public static class MyMapper extends  

Mapper<Text, Text, Text, LongWritable> {  

final Text k2 = new Text();  

final LongWritable v2 = new LongWritable();  

 

protected void map(Text key, Text value,  

Mapper<Text, Text, Text, LongWritable>.Context context)  

throws InterruptedException, IOException {  

//  final String line = value.toString();  

//  final String[] splited = line.split("o");  

//  for (String word : splited) {  

//      k2.set(word);  

k2.set(key);  

v2.set(1);  

context.write(k2, v2);  

//  }  

}  

}

1.3.4 FileInputFormat切片机制(☆☆☆☆☆)

1)job提交流程源码详解

waitForCompletion()

submit();

// 1建立连接

connect();

// 1)创建提交job的代理

new Cluster(getConfiguration());

// (1)判断是本地yarn还是远程

initialize(jobTrackAddr, conf);

// 2 提交job

submitter.submitJobInternal(Job.this, cluster)

// 1)创建给集群提交数据的Stag路径

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)获取jobid ,并创建job路径

JobID jobId = submitClient.getNewJobID();

// 3)拷贝jar包到集群

copyAndConfigureFiles(job, submitJobDir);

rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件

writeSplits(job, submitJobDir);

maps = writeNewSplits(job, jobSubmitDir);

input.getSplits(job);

// 5)向Stag路径写xml配置文件

writeConf(conf, submitJobFile);

conf.writeXml(out);

// 6)提交job,返回提交状态

status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

 

 

1.3.5 在一个运行的Hadoop 任务中,什么是InputSplit?(☆☆☆☆☆)

FileInputFormat源码解析(input.getSplits(job))

(1)找到你数据存储的目录。

(2)开始遍历处理(规划切片)目录下的每一个文件

(3)遍历第一个文件ss.txt

a)获取文件大小fs.sizeOf(ss.txt);

b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

c)默认情况下,切片大小=blocksize

d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片

e)将切片信息写到一个切片规划文件中

f)整个切片的核心过程在getSplit()方法中完成。

g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。

h)注意:block是HDFS上物理存储的存储的数据,切片是对数据逻辑上的划分。

(4)提交切片规划文件yarn上yarn上MrAppMaster可以根据切片规划文件计算开启maptask个数。

1.3.6 自定义InputFormat流程

(1)自定义一个类继承FileInputFormat

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

1.3.7 如何决定一个job的map和reduce的数量?

1)map数量

splitSize=max{minSize,min{maxSize,blockSize}}

map数量由处理的数据分成的block数量决定default_num = total_size / split_size;

2)reduce数量

reduce的数量job.setNumReduceTasks(x);x 为reduce的数量。不设置的话默认为 1。

1.3.8 Maptask的个数由什么决定?

一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。

1.3.9 MapTask工作机制(☆☆☆☆☆)

 

(1)Read阶段:Map Task通过用户编写的RecordReader从输入InputSplit解析出一个个key/value

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在函数内部,它会生成的key/value分区调用Partitioner)并写入一个环形内存缓冲区中。

(4)Spill阶段:即“写”当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要对数据进行合并压缩等操作

写阶段详情:

步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index

(5)Combine阶段:当所有数据处理完成后,MapTask所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

所有数据处理完后,MapTask将所有临时文件合并成一个大文件保存到文件output/file.out,同时生成相应的索引文件output/file.out.index

进行文件合并过程中,MapTask分区为单位进行合并。对于某个分区,将采用多轮递归合并的方式每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

1.3.10 ReduceTask工作机制(☆☆☆☆☆)

 

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

1.3.11 请描述mapReduce有几种排序及排序发生的阶段(☆☆☆☆☆)

1)排序的分类:

(1)部分排序:

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

(2)全排序:

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

(3)辅助排序:(GroupingComparator分组)

Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

(4)二次排序:

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

2)自定义排序WritableComparable

bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override

public int compareTo(FlowBean o) {

// 倒序排列,从大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

3)排序发生的阶段:

(1)一个是在map side发生在spill后partition前。

(2)一个是在reduce side发生在copy后 reduce前。

1.3.12 请描述mapReduce中shuffle阶段的工作流程,如何优化shuffle阶段(☆☆☆☆☆)

分区,排序,溢写,拷贝到对应reduce机器上,增加combiner,压缩溢写的文件。

 

1.3.13 请描述mapReduce中combiner的作用是什么,一般使用情景,哪些情况不需要,及和reduce的区别

1)Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。

2)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。

3)Combiner和reducer的区别在于运行的位置。

Combiner是在每一个maptask所在的节点运行;

Reducer是接收全局所有Mapper的输出结果。

 

1.3.14 Mapreduce的工作原理,请举例子说明mapreduce是怎么运行的?(☆☆☆☆☆)

 

MapReduce是一种编程模型,用于大规模数据集的并行运算。概念"Map(映射)"和"Reduce(归约)",和它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

记住这2点:

      1、MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。

      2、MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
 

 

 

1.3.15 如果没有定义partitioner,那数据在被送达reducer前是如何被分区的?

如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key

的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号“。

1.3.16 MapReduce 出现单点负载多大,怎么负载平衡? (☆☆☆☆☆)

可以用 Partitioner

1.3.17 MapReduce 怎么实现 TopN (☆☆☆☆☆)

可以自定义groupingcomparator,对结果进行最大值排序,然后再reduce输出时,控制只输出前n个数。就达到了topn输出的目的。

1.3.18 Hadoop的缓存机制(Distributedcache)(☆☆☆☆☆)

分布式缓存一个最重要的应用就是在进行join操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行map端的连接操作,经过我的实验验证,这种情况下处理效率大大高于一般的reduce端join,广播处理就运用到了分布式缓存的技术。

DistributedCache将拷贝缓存的文件到Slave节点在任何Job在节点上执行之前,文件在每个Job中只会被拷贝一次,缓存的归档文件会被在Slave节点中解压缩。将本地文件复制到HDFS中去,接着Client会通过addCacheFile() 和addCacheArchive()方法告诉DistributedCache在HDFS中的位置。当文件存放到文地时,JobClient同样获得DistributedCache来创建符号链接,其形式为文件的URI加fragment标识。当用户需要获得缓存中所有有效文件的列表时,JobConf 的方法 getLocalCacheFiles() 和getLocalArchives()都返回一个指向本地文件路径对象数组。

1.3.19 如何使用mapReduce实现两个表的join?(☆☆☆☆☆)

1)reduce side join : 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件File1,tag=2 表示来自文件File2。

2)map side join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task 内存中存在一份(比如存放到hash table 中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table 中查找是否有相同的key 的记录,如果有,则连接后输出即可。

1.3.20 有可能使 Hadoop 任务输出到多个目录中么?如果可以,怎么做?

1)可以输出到多个目录中,采用自定义OutputFormat。

2)实现步骤:

(1)自定义outputformat,

(2)改写recordwriter,具体改写输出数据的方法write()

1.3.21 什么样的计算不能用mr来提速,举5个例子。

1)数据量很小。

2)繁杂的小文件。

3)索引是更好的存取机制的时候。

4)事务处理。

5)只有一台机器的时候。

1.3.22 ETL是哪三个单词的缩写

Extraction-Transformation-Loading的缩写,中文名称为数据提取、转换和加载。

1.3.23 给你一个1G的数据文件。分别有id,name,mark,source四个字段,按照mark分组,id排序,手写一个MapReduce?其中有几个Mapper?

1)MapReduce实现

方案一:在map端对mark和id进行排序

@Override

public int compareTo(SortBean o) {

int result;

 

if (this.mark > o.getMark()) {

result = 1;

}else if(this.mark <o.getMark()){

result = -1;

}else {

result = this.id > o.getId()? -1:1;

}

 

return result;

}

方案二:在map端对mark排序,在reduce端对id分组。

@Override

public int compareTo(GroupBean o) {

int result;

 

result = this.mark > o.mark ? -1 : 1;

 

return result;

}

 

@Override

public int compare(WritableComparable a, WritableComparable b) {

 

GroupBean aBean = (GroupBean) a;

GroupBean bBean = (GroupBean) b;

 

int result;

if (aBean.getId() > bBean.getId()) {

result = 1;

} else if (aBean.getId() < bBean.getId()) {

result = -1;

} else {

result = 0;

}

 

return result;

}

 

2)几个mapper

(1)1024m/128m=8块

1.3.24 一个字符串"jasonbbtomccjackddfftomkk",如果相邻两个字符相同视为一个切分点,实现wordcount(注意会有多个连续相同的情况如aabbccdd,还有要问清出现三个或者多个怎么算)

1.3.25 hadoop二次排序|小表关联大表怎么实现的|切片,shuffle,reduce阶段,map阶段,Yarn流程| combiner使用场景|sort快排手写| 环形缓冲区为什么是环形的 |ETL细节| reduce阶段是怎么下载到本地

hadoop二次排序:二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。如何使用一个Mapreduce就可以实现二次排序。Hadoop有自带的SecondarySort程序,但这个程序只能对整数进行排序,所以我们需要对其进行改进,使其可以对任意字符串进行排序。下面会分别列出这两个程序的详解。    

    Hadoop自带的例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程) 

        public static  class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
        public static class Reduce extends Reducer<IntPair, NullWritable,  IntWritable, IntWritable>

小表关联大表怎么实现的:多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

切片:HDFS的block是逻辑上的数据块.Hadoop2.0中每一块默认大小128MB,实际存储过程中block大小小于等128MB,也可根据配置文件修改块的大小。

shuffle,reduce阶段,map阶段:

Map-Shuffle:

写入之前先进行分区Partition,用户可以自定义分区(就是继承Partitioner类),然后定制到job上,如果没有进行分区,框架会使用 默认的分区(HashPartitioner)对key去hash值之后,然后在对reduceTaskNum进行取模(目的是为了平衡reduce的处理能力),然后决定由那个reduceTask来处理。

将分完区的结果<key,value,partition>开始序列化成字节数组,开始写入缓冲区。

随着map端的结果不端的输入缓冲区,缓冲区里的数据越来越多,缓冲区的默认大小是100M,当缓冲区大小达到阀值时 默认是0.8【spill.percent】(也就是80M),开始启动溢写线程,锁定这80M的内存执行溢写过程,内存—>磁盘,此时map输出的结果继续由另一个线程往剩余的20M里写,两个线程相互独立,彼此互不干扰。

溢写spill线程启动后,开始对key进行排序(Sort)默认的是自然排序,也是对序列化的字节数组进行排序(先对分区号排序,然后在对key进行排序)。

如果客户端自定义了Combiner之后(相当于map阶段的reduce),将相同的key的value相加,这样的好处就是减少溢写到磁盘的数据量(Combiner使用一定得慎重,适用于输入key/value和输出key/value类型完全一致,而且不影响最终的结果)

每次溢写都会在磁盘上生成一个一个的小文件,因为最终的结果文件只有一个,所以需要将这些溢写文件归并到一起,这个过程叫做Merge,最终结果就是一个group({“aaa”,[5,8,3]})

集合里面的值是从不同的溢写文件中读取来的。这时候Map-Shuffle就算是完成了。

一个MapTask端生成一个结果文件。

ReduceTask:

Reduce-Shuffle:

接下来开始进行Reduce-Shuffle 阶段。当MapTask完成任务数超过总数的5%后,开始调度执行ReduceTask任务,然后ReduceTask默认启动5个copy线程到完成的MapTask任务节点上分别copy一份属于自己的数据(使用Http的方式)。

这些拷贝的数据会首先保存到内存缓冲区中,当达到一定的阀值的时候,开始启动内存到磁盘的Merge,也就是溢写过程,一致运行直到map端没有数据生成,最后启动磁盘到磁盘的Merge方式生成最终的那个文件。在溢写过程中,然后锁定80M的数据,然后在延续Sort过程,然后记性group(分组)将相同的key放到一个集合中,然后在进行Merge

然后就开始reduceTask就会将这个文件交给reduced()方法进行处理,执行相应的业务逻辑

 
 

1.3.26 hadoop 实现TopN

  1. package TopN;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import java.util.TreeMap;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. public class TopN {
  16. public static class TopTenMapper extends
  17. Mapper<Object, Text, NullWritable, IntWritable> {
  18. private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
  19. public void map(Object key, Text value, Context context) {
  20. int N = 10; //默认为Top10
  21. N = Integer.parseInt(context.getConfiguration().get("N"));
  22. StringTokenizer itr = new StringTokenizer(value.toString());
  23. while (itr.hasMoreTokens()) {
  24. repToRecordMap.put(Integer.parseInt(itr.nextToken()), " ");
  25. if (repToRecordMap.size() > N) {
  26. repToRecordMap.remove(repToRecordMap.firstKey());
  27. }
  28. }
  29. }
  30. protected void cleanup(Context context) {
  31. for (Integer i : repToRecordMap.keySet()) {
  32. try {
  33. context.write(NullWritable.get(), new IntWritable(i));
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. }
  40. public static class TopTenReducer extends
  41. Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
  42. private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
  43. public void reduce(NullWritable key, Iterable<IntWritable> values,
  44. Context context) throws IOException, InterruptedException {
  45. int N = 10; //默认为Top10
  46. N = Integer.parseInt(context.getConfiguration().get("N"));
  47. for (IntWritable value : values) {
  48. repToRecordMap.put(value.get(), " ");
  49. if (repToRecordMap.size() > N) {
  50. repToRecordMap.remove(repToRecordMap.firstKey());
  51. }
  52. }
  53. for (Integer i : repToRecordMap.descendingMap().keySet()) {
  54. context.write(NullWritable.get(), new IntWritable(i));
  55. }
  56. }
  57. }
  58. public static void main(String[] args) throws Exception {
  59. if (args.length != 3) {
  60. throw new IllegalArgumentException(
  61. "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: hadoop jar <jar-name> "
  62. + "TopN.TopN "
  63. + "<the value of N>"
  64. + "<input-path> "
  65. + "<output-path>");
  66. }
  67. Configuration conf = new Configuration();
  68. conf.set("N", args[0]);
  69. Job job = Job.getInstance(conf, "TopN");
  70. job.setJobName("TopN");
  71. Path inputPath = new Path(args[1]);
  72. Path outputPath = new Path(args[2]);
  73. FileInputFormat.setInputPaths(job, inputPath);
  74. FileOutputFormat.setOutputPath(job, outputPath);
  75. job.setJarByClass(TopN.class);
  76. job.setMapperClass(TopTenMapper.class);
  77. job.setReducerClass(TopTenReducer.class);
  78. job.setNumReduceTasks(1);
  79. job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的key
  80. job.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
  81. job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的key
  82. job.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value
  83. System.exit(job.waitForCompletion(true) ? 0 : 1);
  84. }
  85. }

1.3.27 kvBuffer

缓冲区的默认大小是100M,可以通过mapreduce.task.io.sort.mb这个属性来设置,具体配置得根据具体业务情景来分析. 一般不修改

1.3.28 HiveSQL 转MapReduce join 实现怎么处理join关系。写程序,怎么实现两个表join?

https://blog.csdn.net/peng_0129/article/details/80623132(感谢)

1.3.29 MapReduce怎么解决数据均衡问题 如何确定分区号?

可以用 Partitioner,

1.3.30 mr环形数组怎么设置最大能设置多大?

环形缓存的设置根据具体的需求设置太大对硬件要求以及处理都有不同的影响。

1.3.31 MapReduce数据倾斜和内存溢出怎么办?

1.3.32 如果没有定义partitioner,那数据在被送达reduce前是如何被分区的?

Partitioner是在map函数执行context.write()时被调用。 用户可以通过实现自定义的?Partitioner来控制哪个key被分配给哪个?Reducer。 查看源码知道: 如果没有定义partitioner,那么会走默认的分区Hashpartitioner

1.3.33分别举例什么情况使用cmbiner,什么情况不会使用?

Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了

1.3.34 Hadoop中通过拆分任务到多个节点运行来实现并行计算,但某些节点运行较慢会拖慢整个任务的运行,hadoop采用何种机制应对这个情况?

结果查看监控日志,得知产生这种现象的原因是数据倾斜问题 解决: (1)调整拆分mapper的数量(partition数量) (2)增加jvm (3)适当地将reduce的数量变大

1.3.35 如何为一个hadoop任务设置mapper的数量?

 hadoop集群HDFS块大小进行分片,每一个分片会由一个map任务来进行处理,用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf的conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。
 

1.3.36 如何为一个hadoop任务设置要创建reducer的数量?

Max(min.split,min(max.split,block))就设置分片的最大最下值  computeSplitSize()设置

 

1.4 Yarn及源码框架(☆☆☆☆)

1.4.1 简述Hadoop1与Hadoop2 的架构异同

加入了yarn解决了资源调度的问题。

加入了对zookeeper的支持实现比较可靠的高可用。

1.4.2 为什么会产生 yarn,它解决了什么问题,有什么优势?

Yarn最主要的功能就是解决运行的用户程序与yarn框架完全解耦。

Yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序……

1.4.3 MR作用提交全过程(☆☆☆☆☆)

1)作业提交过程之YARN

 

2)作业提交过程之MapReduce

 

3)作业提交过程之读数据

 

4)作业提交过程之写数据

 

 

1.4.4 HDFS的数据压缩算法? (☆☆☆☆☆)

Hadoop中常用的压缩算法有bzip2、gzip、lzo、snappy,其中lzo、snappy需要操作系统安装native库才可以支持。

数据可以压缩的位置如下所示。

 

企业开发用的比较多的是snappy。

1.4.5 Hadoop的调度器总结(☆☆☆☆☆)

目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler

具体设置详见:yarn-default.xml文件

<property>

    <description>The class to use as the resource scheduler.</description>

    <name>yarn.resourcemanager.scheduler.class</name>

<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>

</property>

1)先进先出调度器(FIFO)

2)容量调度器(Capacity Scheduler)

3)公平调度器(Fair Scheduler)

 

1.4.6 MapReduce 2.0 容错性(☆☆☆☆☆)

1)MRAppMaster容错性

一旦运行失败,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次。一旦超过最高重启次数,则作业运行失败。

2)Map Task/Reduce Task

Task周期性向MRAppMaster汇报心跳;一旦Task 挂掉,则MRAppMaster将为之重新申请资源,并运行之。最多重新运行次数可由用户设置,默认4 次。

1.4.7 mapreduce推测执行算法及原理(☆☆☆☆☆)

1)作业完成时间取决于最慢的任务完成时间

一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢。

典型案例:系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?

2)推测执行机制:

发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。

3)执行推测任务的前提条件

(1)每个task只能有一个备份任务

(2)当前job已完成的task必须不小于0.05(5%)

(3)开启推测执行参数设置。Hadoop2.7.2 mapred-site.xml文件中默认是打开的。

<property>

  <name>mapreduce.map.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some map tasks

               may be executed in parallel.</description>

</property>

 

<property>

  <name>mapreduce.reduce.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some reduce tasks

               may be executed in parallel.</description>

</property>

4)不能启用推测执行机制情况

   (1)任务间存在严重的负载倾斜;

   (2)特殊任务,比如任务向数据库中写数据。

1.4.8 Hadoop升级 Hadoop 源代码 mapreduce的map output的实现(☆☆☆☆☆)

http://blog.csdn.net/lw305080/article/details/56479170

1.4.9 Hadoop安全,及资源管理方案介绍(☆☆☆☆☆)

 Apache Hadoop 1.0.0版本和Cloudera CDH3之后的版本添加了安全机制,如果你将Hadoop升级到这两个版本,可能会导致Hadoop的一些应用不可用。

       Hadoop提供了两种安全机制:Simple和Kerberos。Simple机制(默认情况,Hadoop采用该机制)采用了SAAS协议。 也就是说,用户提交作业时,你说你是XXX(在JobConf的user.name中说明),则在JobTracker端要进行核实,包括两部分核实,一是你到底是不是这个人,即通过检查执行当前代码的人与user.name中的用户是否一致;然后检查ACL(Access Control List)配置文件(由管理员配置),看你是否有提交作业的权限。一旦你通过验证,会获取HDFS或者mapreduce授予的delegation token(访问不同模块由不同的delegation token),之后的任何操作,比如访问文件,均要检查该token是否存在,且使用者跟之前注册使用该token的人是否一致。
 

1.4.10 介绍Yarn调度器如何分配作业,源代码层面分析(☆☆☆☆☆)

转自:https://blog.csdn.net/abcd1430/article/details/52744155

1.4.11 mesos和yarn资源管理器对比,及使用场景(☆☆☆☆☆)

转自:https://blog.csdn.net/Kaiyang_Shao/article/details/89926352

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/289344
推荐阅读
相关标签
  

闽ICP备14008679号