赞
踩
A.CPU
B.网络
C.磁盘 IO
D.内存
答案解析: C.磁盘 IO对集群的影响 IO作为传输数据的管道如果管道越大对数据的传输也自然够大,其保证集群数据传输的稳定。
1.1.2 下列哪项可以作为集群的管理?(C)
C.ClouderaManager
D.Zookeeper
A.单机版
B.伪分布式
C.完全分布式
1)Zookeeper:是一个开源的分布式应用程序协调服务,基于zookeeper可以实现同步服务,配置维护,命名服务。
2)Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
3)Hbase:是一个分布式的、面向列的开源数据库, 利用Hadoop HDFS作为其存储系统。
4)Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据档映射为一张数据库表,并提供简单的sql 查询功能,可以将sql语句转换为MapReduce任务进行运行。
5)Sqoop:将一个关系型数据库中的数据导进到Hadoop的 HDFS中,也可以将HDFS的数据导进到关系型数据库中。
Hadoop是指Hadoop框架本身;hadoop生态系统,不仅包含hadoop,还包括保证hadoop框架正常高效运行其他框架,比如zookeeper、Flume、Hbase、Hive、Sqoop等辅助框架。
1)使用root账户登录
2)修改IP
3)修改host主机名
4)配置SSH免密码登录
5)关闭防火墙
6)安装JDK
7)解压hadoop安装包
8)配置hadoop的核心文件 hadoop-env.sh,core-site.xml , mapred-site.xml , hdfs-site.xml
9)配置hadoop环境变量
10)格式化 hadoop namenode-format
11)启动节点start-all.sh
1)core-site.xml:
(1)fs.defaultFS:hdfs://cluster1(域名),这里的值指的是默认的HDFS路径 。
(2)hadoop.tmp.dir:/export/data/hadoop_tmp,这里的路径默认是NameNode、DataNode、secondaryNamenode等存放数据的公共目录。用户也可以自己单独指定这三类节点的目录。
(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,这里是ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 。
2)hadoop-env.sh: 只需设置jdk的安装路径,如:export JAVA_HOME=/usr/local/jdk。
3)hdfs-site.xml:
(1) dfs.replication:他决定着系统里面的文件块的数据备份个数,默认为3个。
(2) dfs.data.dir:datanode节点存储在文件系统的目录 。
(3) dfs.name.dir:是namenode节点存储hadoop文件系统信息的本地系统路径 。
4)mapred-site.xml:
mapreduce.framework.name: yarn指定mr运行在yarn上。
1)NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。
2)SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。
3)DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。
4)ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。
5)NodeManager(TaskTracker)执行任务
6)DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
7)JournalNode 高可用情况下存放namenode的editlog文件.
1)dfs.namenode.http-address:50070
2)SecondaryNameNode辅助名称节点端口号:50090
3)dfs.datanode.address:50010
4)fs.defaultFS:8020 或者9000
5)yarn.resourcemanager.webapp.address:8088
1)NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。
2)SecondaryNameNode它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并editslog,减少NN启动时间。
3)DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。
4)ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。
5)NodeManager(TaskTracker)执行任务
6)DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
7)JournalNode 高可用情况下存放namenode的editlog文件.
(1) 从应用程序角度进行优化。由于mapreduce是迭代逐行解析数据文件的,怎样在迭代的情况下,编写高效率的应用程序,是一种优化思路。
(2) 对Hadoop参数进行调优。当前hadoop系统有190多个配置参数,怎样调整这些参数,使hadoop作业运行尽可能的快,也是一种优化思路。
(3) 从系统实现角度进行优化。这种优化难度是最大的,它是从hadoop实现机制角度,发现当前Hadoop设计和实现上的缺点,然后进行源码级地修改。该方法虽难度大,但往往效果明显。
数据平衡:在Hadoop中,包含一个Balancer程序,通过运行这个程序,可以使得HDFS集群达到一个平衡的状态,使用这个程序的命令如下:
sh $HADOOP_HOME/bin/start-balancer.sh –t 10%
这个命令中-t参数后面跟的是HDFS达到平衡状态的磁盘使用率偏差值。如果机器与机器之间磁盘使用率偏差小于10%,那么我们就认为HDFS集群已经达到了平衡的状态。
由于Hadoop将数据拆分为各种块,因此使用RecordReader将拆分数据读取到单个记录中。
JobTracker 是一个 master 服务,软件启动之后 JobTracker 接收 Job,负责调度 Job的每一个子任务, task 运行于 TaskTracker 上,并监控它们,如果发现有失败的 task 就重新运行它。一般情况应该把 JobTracker 部署在单独的机器上。
TaskTracker 是运行在多个节点上的 slaver 服务。TaskTracker 主动与 JobTracker 通信,接收作业,并负责直接执行每一个任务。
A.3 份
B.2 份
C.1 份
D.不确定
A.32MB
B.64MB(2.7.2版本,本地模式)
C.128MB(2.7.2版本,分布式模式)
A.数据经过NameNode传递DataNode
B.Client端将文件切分为Block,依次上传
C.Client只上传数据到一台DataNode,然后由NameNode负责Block复制工作
A.NameNode
B.JobTracker
C.DataNode
D.SecondaryNameNode
E.tasktracker
A.它是NameNode的热备
B.它对内存没有要求
C.他的目的使帮助NameNode合并编辑日志,减少NameNode 启动时间
D. SecondaryNameNode应与NameNode 部署到一个节点
A.SecondaryNameNode
B.DataNode
C.TaskTracker
D.JobTracker
hadoop的集群是基于master/slave模式,namenode和jobtracker属于master,datanode和tasktracker属于slave,master只有一个,而slave有多个。 SecondaryNameNode内存需求和NameNode在一个数量级上,所以通常secondary NameNode(运行在单独的物理机器上)和 NameNode 运行在不同的机器上。 JobTracker对应于NameNode,TaskTracker对应于DataNode。 DataNode和NameNode是针对数据存放来而言的。JobTracker和TaskTracker是对于MapReduce执行而言的。
mapreduce中几个主要概念,mapreduce 整体上可以分为这么几条执行线索: jobclient,JobTracker与TaskTracker。 1)JobClient会在用户端通过JobClient类将已经配置参数打包成jar文件的应用存储到hdfs,并把路径提交到Jobtracker,然后由JobTracker创建每一个Task(即 MapTask 和 ReduceTask)并将它们分发到各个TaskTracker服务中去执行。 2)JobTracker是一master服务,软件启动之后JobTracker接收Job,负责调度Job的每一个子任务。task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker 部署在单独的机器上。 3)TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。TaskTracker 都需要运行在HDFS的DataNode上。 |
增加文件块大小,需要增加磁盘的传输速率。
HDFS存储机制,包括HDFS的写入过程和读取过程两个部分
1)客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。
2)namenode返回是否可以上传。
3)客户端请求第一个 block上传到哪几个datanode服务器上。
4)namenode返回3个datanode节点,分别为dn1、dn2、dn3。
5)客户端请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成
6)dn1、dn2、dn3逐级应答客户端
7)客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答
8)当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)
1)客户端向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。
2)挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。
3)datanode开始传输数据给客户端(从磁盘里面读取数据放入流,以packet为单位来做校验)。
4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。
1)第一阶段:namenode启动
(1)第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求
(3)namenode记录操作日志,更新滚动日志。
(4)namenode在内存中对数据进行增删改查
2)第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果。
(2)Secondary NameNode请求执行checkpoint。
(3)namenode滚动正在写的edits日志
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint
(7)拷贝fsimage.chkpoint到namenode
(8)namenode将fsimage.chkpoint重新命名成fsimage
1)机制流程同上;
2)区别
(1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
(2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。
3)联系:
(1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。
(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。
1)节点上线操作:
当要新上线数据节点的时候,需要把数据节点的名字追加在 dfs.hosts 文件中
(1)关闭新增节点的防火墙
(2)在 NameNode 节点的 hosts 文件中加入新增数据节点的 hostname
(3)在每个新增数据节点的 hosts 文件中加入 NameNode 的 hostname
(4)在 NameNode 节点上增加新增节点的 SSH 免密码登录的操作
(5)在 NameNode 节点上的 dfs.hosts 中追加上新增节点的 hostname,
(6)在其他节点上执行刷新操作:hdfs dfsadmin -refreshNodes
(7)在 NameNode 节点上,更改 slaves 文件,将要上线的数据节点 hostname 追加
到 slaves 文件中
(8)启动 DataNode 节点
(9)查看 NameNode 的监控页面看是否有新增加的节点
2)节点下线操作:
(1)修改/conf/hdfs-site.xml 文件
(2)确定需要下线的机器,dfs.osts.exclude 文件中配置好需要下架的机器,这个是阻
止下架的机器去连接 NameNode。
(3)配置完成之后进行配置的刷新操作./bin/hadoop dfsadmin -refreshNodes,这个操作的作用是在后台进行 block 块的移动。
(4)当执行三的命令完成之后,需要下架的机器就可以关闭了,可以查看现在集群上连接的节点,正在执行 Decommission,会显示:Decommission Status : Decommission in progress 执行完毕后,会显示:Decommission Status : Decommissioned
(5)机器下线完毕,将他们从excludes 文件中移除。
NameNode整个内存结构大致可以分成四大部分:Namespace、BlocksMap、NetworkTopology及其它,图2为各数据结构内存逻辑分布图示。
图2 NameNode内存全景图
1)Namespace:维护整个文件系统的目录树结构及目录树上的状态变化;
2)BlockManager:维护整个文件系统中与数据块相关的信息及数据块的状态变化;
3)NetworkTopology:维护机架拓扑及DataNode信息,机架感知的基础;
4)其它:
LeaseManager:读写的互斥同步就是靠Lease实现,支持HDFS的Write-Once-Read-Many的核心数据结构;
CacheManager:Hadoop 2.3.0引入的集中式缓存新特性,支持集中式缓存的管理,实现memory-locality提升读性能;
SnapshotManager:Hadoop 2.1.0引入的Snapshot新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
DelegationTokenSecretManager:管理HDFS的安全访问;
另外还有临时数据信息、统计信息metrics等等。
NameNode常驻内存主要被Namespace和BlockManager使用,二者使用占比分别接近50%。其它部分内存开销较小且相对固定,与Namespace和BlockManager相比基本可以忽略。
ZKFailoverController主要职责
1)健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。
2)会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。
3)当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。
4)master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态
1)HealthMonitor初始化完成后通过内部线程调用NameNode的RPC接口对其进行健康检查
2)如果检查到NameNode状态异常,会回调ZKFailoverContorller注册的回调函数进行相应的处理
3)如果ZKFailoverController发现集群需要进行主备选举,会使用ActiveStanbyElector和zookeeper集群通信完成主备切换
4)ActiveStanbyElector在完成主备切换后,回调ZKFailoverController注册的方法使NameNode变成active或者stanby状态
单Active NN的架构使得HDFS在集群扩展性和性能上都有潜在的问题,当集群大到一定程度后,NN进程使用的内存可能会达到上百G,NN成为了性能的瓶颈
常用的估算公式为1G对应1百万个块,按缺省块大小计算的话,大概是64T (这个估算比例是有比较大的富裕的,其实,即使是每个文件只有一个块,所有元数据信息也不会有1KB/block)为了解决这个问题,Hadoop 2.x提供了HDFS Federation, 示意图如下:
多个NN共用一个集群里的存储资源,每个NN都可以单独对外提供服务每个NN都会定义一个存储池,有单独的id,每个DN都为所有存储池提供存储。
DN会按照存储池id向其对应的NN汇报块信息,同时,DN会向所有NN汇报本地存储可用资源情况。
如果需要在客户端方便的访问若干个NN上的资源,可以使用客户端挂载表,把不同的目录映射到不同的NN,但NN上必须存在相应的目录。
HDFS Federation意味着在集群中将会有多个namenode/namespace,这样的方式有什么好处呢?
多namespace的方式可以直接减轻单一NameNode的压力。
一个典型的例子就是上面提到的NameNode内存过高问题,我们完全可以将上面部分大的文件目录移到另外一个NameNode上做管理.更重要的一点在于,这些NameNode是共享集群中所有的DataNode的,它们还是在同一个集群内的。HDFS Federation原理结构图如下:
我们可以拿这种图与上一小节的图做对比,我们可以得出这样一个结论:
HDFS Federation是解决NameNode单点问题的水平横向扩展方案。
这时候在DataNode上就不仅仅存储一个Block Pool下的数据了,而是多个(大家可以在DataNode的datadir所在目录里面查看BP-xx.xx.xx.xx打头的目录)。
在HDFS Federation的情况下,只有元数据的管理与存放被分隔开了,但真实数据的存储还是共用的,这与viewFs还是不一样的。之前看别的文章在讲述HDFS Federation的时候直接拿viewFs来讲,个人觉得二者还是有些许的不同的,用一句话概况应该这么说。
HDFS的viewFs是namespace完全独立(私人化)的Federation方案,可以这么说,viewFs是Federation的一个简单实现方案。
因为它们不仅仅是namespace独立,而且真实数据的存放也是独立的,也就是多个完全独立的集群。在这点上我们还是有必要做一下区分,否则让人以为HDFS Federation就是viewFs。
第一点,命名空间的扩展。因为随着集群使用时间的加长,HDFS上存放的数据也将会越来越多。这个时候如果还是将所有的数据都往一个NameNode上存放,这个文件系统会显得非常的庞大。这时候我们可以进行横向扩展,把一些大的目录分离出去.使得每个NameNode下的数据看起来更加的精简。
第二点,性能的提升.这个也很好理解。当NameNode所持有的数据量达到了一个非常大规模的量级的时候(比如超过1亿个文件),这个时候NameNode的处理效率可能就会有影响,它可能比较容易的会陷入一个繁忙的状态。而整个集群将会受限于一个单点NameNode的处理效率,从而影响集群整体的吞吐量。这个时候多NameNode机制显然可以减轻很多这部分的压力。
第三点,资源的隔离。这一点考虑的就比较深了。通过多个命名空间,我们可以将关键数据文件目录移到不同的NameNode上,以此不让这些关键数据的读写操作受到其他普通文件读写操作的影响。也就是说这些NameNode将会只处理特定的关键的任务所发来的请求,而屏蔽了其他普通任务的文件读写请求,以此做到了资源的隔离。千万不要小看这一点,当你发现NameNode正在处理某个不良任务的大规模的请求操作导致响应速度极慢时,你一定会非常的懊恼。
Hadoop1.x都是64M,hadoop2.x开始都是128M。
Hdfs dfs -ls /
Hdfs dfs -cat /xxx
NameNode与Datanode的总结概述
所有的文件都是以block块的方式存放在HDFS文件系统当中,在hadoop1当中,文件的block块默认大小是64M,hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定
<property>
<name>dfs.block.size</name>
<value>块大小 以字节为单位</value>//只写数值就可以
</property>
1)序列化和反序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
2)自定义bean对象要想序列化传输步骤及注意事项:。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用
(7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
A.TextInputFormat B.KeyValueInputFormat C.SequenceFileInputFormat
1)相同点:
TextInputformat和KeyValueTextInputFormat都继承了FileInputFormat类,都是每一行作为一个记录;
2)区别:
TextInputformat将每一行在文件中的起始偏移量作为 key,每一行的内容作为value。默认以\n或回车键作为一行记录。
KeyValueTextInputFormat 适合处理输入数据的每一行是两列,并用 tab 分离的形式。
public static class MyMapper extends Mapper<Text, Text, Text, LongWritable> { final Text k2 = new Text(); final LongWritable v2 = new LongWritable();
protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context) throws InterruptedException, IOException { // final String line = value.toString(); // final String[] splited = line.split("o"); // for (String word : splited) { // k2.set(word); k2.set(key); v2.set(1); context.write(k2, v2); // } } } |
1)job提交流程源码详解
waitForCompletion() submit(); // 1建立连接 connect(); // 1)创建提交job的代理 new Cluster(getConfiguration()); // (1)判断是本地yarn还是远程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)创建给集群提交数据的Stag路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)获取jobid ,并创建job路径 JobID jobId = submitClient.getNewJobID(); // 3)拷贝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路径写xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); |
FileInputFormat源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt);
b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
1)map数量
splitSize=max{minSize,min{maxSize,blockSize}}
map数量由处理的数据分成的block数量决定default_num = total_size / split_size;
2)reduce数量
reduce的数量job.setNumReduceTasks(x);x 为reduce的数量。不设置的话默认为 1。
一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
1)排序的分类:
(1)部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
(2)全排序:
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。
(3)辅助排序:(GroupingComparator分组)
Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
(4)二次排序:
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2)自定义排序WritableComparable
bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } |
3)排序发生的阶段:
(1)一个是在map side发生在spill后partition前。
(2)一个是在reduce side发生在copy后 reduce前。
分区,排序,溢写,拷贝到对应reduce机器上,增加combiner,压缩溢写的文件。
1)Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
2)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
3)Combiner和reducer的区别在于运行的位置。
Combiner是在每一个maptask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果。
MapReduce是一种编程模型,用于大规模数据集的并行运算。概念"Map(映射)"和"Reduce(归约)",和它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
记住这2点:
1、MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
2、MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key
的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号“。
可以用 Partitioner
可以自定义groupingcomparator,对结果进行最大值排序,然后再reduce输出时,控制只输出前n个数。就达到了topn输出的目的。
分布式缓存一个最重要的应用就是在进行join操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行map端的连接操作,经过我的实验验证,这种情况下处理效率大大高于一般的reduce端join,广播处理就运用到了分布式缓存的技术。
DistributedCache将拷贝缓存的文件到Slave节点在任何Job在节点上执行之前,文件在每个Job中只会被拷贝一次,缓存的归档文件会被在Slave节点中解压缩。将本地文件复制到HDFS中去,接着Client会通过addCacheFile() 和addCacheArchive()方法告诉DistributedCache在HDFS中的位置。当文件存放到文地时,JobClient同样获得DistributedCache来创建符号链接,其形式为文件的URI加fragment标识。当用户需要获得缓存中所有有效文件的列表时,JobConf 的方法 getLocalCacheFiles() 和getLocalArchives()都返回一个指向本地文件路径对象数组。
1)reduce side join : 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件File1,tag=2 表示来自文件File2。
2)map side join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task 内存中存在一份(比如存放到hash table 中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table 中查找是否有相同的key 的记录,如果有,则连接后输出即可。
1)可以输出到多个目录中,采用自定义OutputFormat。
2)实现步骤:
(1)自定义outputformat,
(2)改写recordwriter,具体改写输出数据的方法write()
1)数据量很小。
2)繁杂的小文件。
3)索引是更好的存取机制的时候。
4)事务处理。
5)只有一台机器的时候。
Extraction-Transformation-Loading的缩写,中文名称为数据提取、转换和加载。
1)MapReduce实现
方案一:在map端对mark和id进行排序
@Override public int compareTo(SortBean o) { int result;
if (this.mark > o.getMark()) { result = 1; }else if(this.mark <o.getMark()){ result = -1; }else { result = this.id > o.getId()? -1:1; }
return result; } |
方案二:在map端对mark排序,在reduce端对id分组。
@Override public int compareTo(GroupBean o) { int result;
result = this.mark > o.mark ? -1 : 1;
return result; } |
@Override public int compare(WritableComparable a, WritableComparable b) {
GroupBean aBean = (GroupBean) a; GroupBean bBean = (GroupBean) b;
int result; if (aBean.getId() > bBean.getId()) { result = 1; } else if (aBean.getId() < bBean.getId()) { result = -1; } else { result = 0; }
return result; } |
2)几个mapper
(1)1024m/128m=8块
hadoop二次排序:二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。如何使用一个Mapreduce就可以实现二次排序。Hadoop有自带的SecondarySort程序,但这个程序只能对整数进行排序,所以我们需要对其进行改进,使其可以对任意字符串进行排序。下面会分别列出这两个程序的详解。
Hadoop自带的例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>
小表关联大表怎么实现的:多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
切片:HDFS的block是逻辑上的数据块.Hadoop2.0中每一块默认大小128MB,实际存储过程中block大小小于等128MB,也可根据配置文件修改块的大小。
shuffle,reduce阶段,map阶段:
Map-Shuffle:
写入之前先进行分区Partition,用户可以自定义分区(就是继承Partitioner类),然后定制到job上,如果没有进行分区,框架会使用 默认的分区(HashPartitioner)对key去hash值之后,然后在对reduceTaskNum进行取模(目的是为了平衡reduce的处理能力),然后决定由那个reduceTask来处理。
将分完区的结果<key,value,partition>开始序列化成字节数组,开始写入缓冲区。
随着map端的结果不端的输入缓冲区,缓冲区里的数据越来越多,缓冲区的默认大小是100M,当缓冲区大小达到阀值时 默认是0.8【spill.percent】(也就是80M),开始启动溢写线程,锁定这80M的内存执行溢写过程,内存—>磁盘,此时map输出的结果继续由另一个线程往剩余的20M里写,两个线程相互独立,彼此互不干扰。
溢写spill线程启动后,开始对key进行排序(Sort)默认的是自然排序,也是对序列化的字节数组进行排序(先对分区号排序,然后在对key进行排序)。
如果客户端自定义了Combiner之后(相当于map阶段的reduce),将相同的key的value相加,这样的好处就是减少溢写到磁盘的数据量(Combiner使用一定得慎重,适用于输入key/value和输出key/value类型完全一致,而且不影响最终的结果)
每次溢写都会在磁盘上生成一个一个的小文件,因为最终的结果文件只有一个,所以需要将这些溢写文件归并到一起,这个过程叫做Merge,最终结果就是一个group({“aaa”,[5,8,3]})
集合里面的值是从不同的溢写文件中读取来的。这时候Map-Shuffle就算是完成了。
一个MapTask端生成一个结果文件。
ReduceTask:
Reduce-Shuffle:
接下来开始进行Reduce-Shuffle 阶段。当MapTask完成任务数超过总数的5%后,开始调度执行ReduceTask任务,然后ReduceTask默认启动5个copy线程到完成的MapTask任务节点上分别copy一份属于自己的数据(使用Http的方式)。
这些拷贝的数据会首先保存到内存缓冲区中,当达到一定的阀值的时候,开始启动内存到磁盘的Merge,也就是溢写过程,一致运行直到map端没有数据生成,最后启动磁盘到磁盘的Merge方式生成最终的那个文件。在溢写过程中,然后锁定80M的数据,然后在延续Sort过程,然后记性group(分组)将相同的key放到一个集合中,然后在进行Merge
然后就开始reduceTask就会将这个文件交给reduced()方法进行处理,执行相应的业务逻辑
- package TopN;
-
- import java.io.IOException;
- import java.util.StringTokenizer;
- import java.util.TreeMap;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class TopN {
- public static class TopTenMapper extends
- Mapper<Object, Text, NullWritable, IntWritable> {
- private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
-
- public void map(Object key, Text value, Context context) {
- int N = 10; //默认为Top10
- N = Integer.parseInt(context.getConfiguration().get("N"));
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- repToRecordMap.put(Integer.parseInt(itr.nextToken()), " ");
- if (repToRecordMap.size() > N) {
- repToRecordMap.remove(repToRecordMap.firstKey());
- }
- }
- }
-
-
-
- protected void cleanup(Context context) {
- for (Integer i : repToRecordMap.keySet()) {
- try {
- context.write(NullWritable.get(), new IntWritable(i));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public static class TopTenReducer extends
- Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
- private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
-
- public void reduce(NullWritable key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- int N = 10; //默认为Top10
- N = Integer.parseInt(context.getConfiguration().get("N"));
- for (IntWritable value : values) {
- repToRecordMap.put(value.get(), " ");
- if (repToRecordMap.size() > N) {
- repToRecordMap.remove(repToRecordMap.firstKey());
- }
- }
- for (Integer i : repToRecordMap.descendingMap().keySet()) {
- context.write(NullWritable.get(), new IntWritable(i));
- }
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 3) {
- throw new IllegalArgumentException(
- "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: hadoop jar <jar-name> "
- + "TopN.TopN "
- + "<the value of N>"
- + "<input-path> "
- + "<output-path>");
- }
- Configuration conf = new Configuration();
- conf.set("N", args[0]);
- Job job = Job.getInstance(conf, "TopN");
- job.setJobName("TopN");
- Path inputPath = new Path(args[1]);
- Path outputPath = new Path(args[2]);
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- job.setJarByClass(TopN.class);
- job.setMapperClass(TopTenMapper.class);
- job.setReducerClass(TopTenReducer.class);
- job.setNumReduceTasks(1);
-
- job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的key
- job.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
-
- job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的key
- job.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
缓冲区的默认大小是100M,可以通过mapreduce.task.io.sort.mb这个属性来设置,具体配置得根据具体业务情景来分析. 一般不修改
https://blog.csdn.net/peng_0129/article/details/80623132(感谢)
可以用 Partitioner,
环形缓存的设置根据具体的需求设置太大对硬件要求以及处理都有不同的影响。
Partitioner是在map函数执行context.write()时被调用。 用户可以通过实现自定义的?Partitioner来控制哪个key被分配给哪个?Reducer。 查看源码知道: 如果没有定义partitioner,那么会走默认的分区Hashpartitioner
Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了
结果查看监控日志,得知产生这种现象的原因是数据倾斜问题 解决: (1)调整拆分mapper的数量(partition数量) (2)增加jvm (3)适当地将reduce的数量变大
hadoop集群HDFS块大小进行分片,每一个分片会由一个map任务来进行处理,用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf的conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。
Max(min.split,min(max.split,block))就设置分片的最大最下值 computeSplitSize()设置
加入了yarn解决了资源调度的问题。
加入了对zookeeper的支持实现比较可靠的高可用。
Yarn最主要的功能就是解决运行的用户程序与yarn框架完全解耦。
Yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序……
1)作业提交过程之YARN
2)作业提交过程之MapReduce
3)作业提交过程之读数据
4)作业提交过程之写数据
Hadoop中常用的压缩算法有bzip2、gzip、lzo、snappy,其中lzo、snappy需要操作系统安装native库才可以支持。
数据可以压缩的位置如下所示。
企业开发用的比较多的是snappy。
目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
具体设置详见:yarn-default.xml文件
<property> <description>The class to use as the resource scheduler.</description> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> </property> |
1)先进先出调度器(FIFO)
2)容量调度器(Capacity Scheduler)
3)公平调度器(Fair Scheduler)
1)MRAppMaster容错性
一旦运行失败,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次。一旦超过最高重启次数,则作业运行失败。
2)Map Task/Reduce Task
Task周期性向MRAppMaster汇报心跳;一旦Task 挂掉,则MRAppMaster将为之重新申请资源,并运行之。最多重新运行次数可由用户设置,默认4 次。
1)作业完成时间取决于最慢的任务完成时间
一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢。
典型案例:系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?
2)推测执行机制:
发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
3)执行推测任务的前提条件
(1)每个task只能有一个备份任务;
(2)当前job已完成的task必须不小于0.05(5%)
(3)开启推测执行参数设置。Hadoop2.7.2 mapred-site.xml文件中默认是打开的。
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property>
<property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property> |
4)不能启用推测执行机制情况
(1)任务间存在严重的负载倾斜;
(2)特殊任务,比如任务向数据库中写数据。
1.4.8 Hadoop升级 Hadoop 源代码 mapreduce的map output的实现(☆☆☆☆☆)
http://blog.csdn.net/lw305080/article/details/56479170
1.4.9 Hadoop安全,及资源管理方案介绍(☆☆☆☆☆)
Apache Hadoop 1.0.0版本和Cloudera CDH3之后的版本添加了安全机制,如果你将Hadoop升级到这两个版本,可能会导致Hadoop的一些应用不可用。
Hadoop提供了两种安全机制:Simple和Kerberos。Simple机制(默认情况,Hadoop采用该机制)采用了SAAS协议。 也就是说,用户提交作业时,你说你是XXX(在JobConf的user.name中说明),则在JobTracker端要进行核实,包括两部分核实,一是你到底是不是这个人,即通过检查执行当前代码的人与user.name中的用户是否一致;然后检查ACL(Access Control List)配置文件(由管理员配置),看你是否有提交作业的权限。一旦你通过验证,会获取HDFS或者mapreduce授予的delegation token(访问不同模块由不同的delegation token),之后的任何操作,比如访问文件,均要检查该token是否存在,且使用者跟之前注册使用该token的人是否一致。
1.4.10 介绍Yarn调度器如何分配作业,源代码层面分析(☆☆☆☆☆)
转自:https://blog.csdn.net/abcd1430/article/details/52744155
1.4.11 mesos和yarn资源管理器对比,及使用场景(☆☆☆☆☆)
转自:https://blog.csdn.net/Kaiyang_Shao/article/details/89926352
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。