赞
踩
主要内容:了解Hadoop、MapReduce、HDFS
介绍一下这三者:
Hadoop的核心就是HDFS和MapReduce
MapReuduce的关键属性:
Hadoop的守护进程的地址和端口
Hadoop一般同时运行RPC和HTTP两个服务器,RPC服务器支持守护进程间的通信,HTTP服务器则提供与用户交互的Web页面
需要给各个服务器配置网络地址和端口号
普通的文件系统只需要单个计算机节点就可以完成文件的存储和处理,单个计算机节点由处理器、内存、高速缓存和本地磁盘构成。
分布式文件系统把文件分布存储到多个计算机节点上,成千上万 的计算机节点构成计算机集群
集群中的计算机节点存放在 机架(Rack)上,每个机架可以存放8~64个节点,同一机架上的不同
节点之间通过网络互连(常采用吉比特以太网),多个不同机架之间采用另一级网络或交换机互连。
文件块会被复制为多个副 本,存储在不同的节点上,而且存储同一文件块的不同副本的各个节
点会分布在不同的机架上,这样,在单个节点出现故障时,就可以快 速调用副本重启单个节点上的计算过程,而不用重启整个计算过程, 整个机架出现故障时也不会丢失所有文件块
体系结构:
介绍一些mapreduce的特点:
一次写入多次读取:
MapRuduce实际上有点暴力,每个查询需要处理整个数据集(至少很大一部分)。反过来想,这也正是它的优势所在。
MapReduce是一个批量查询处理器,能够在合理的时间范围内处理针对整个数据集的即时查询。以前的数据存储在磁盘上,要读取数据需要取寻址,会耗费大量时间。
既然说到大数据的存储,那为什么不用关系型数据库啊?MYSQL的DBMS不是挺好的么?
其实数据读取(select)的时间开销主要在寻址,读取大量的数据集时间开销不可避免的增加,这种流式读取的速率,主要取决于传输速率。
补充一下什么是流式读取(我的个人理解):
场景:首先你写一个mysql语句 select stu.name from Students
,执行 , 然后mysql的内核就去数据库里查询数据,将查询的结果返回存储在内存里面,然后读取出来。
流式数据访问,就相当下载10G的电影,用迅雷边下边播的模式。
非流式数据访问,就相当于,10G的电影,完全下好了,再播放。
回到MapReduce,所以啊如果是处理小规模的数据关系型数据库没毛病very good ,但是要操作大规模数据,MapReduce是爸爸,因为爸爸可以排序、组合重建数据库
对比一下:
MapReduce对非结构化和半结构化数据非常有效
结构化数据:具有既定格式的实体化数据,例如 XML文件
半结构化数据:可能有格式,但经常被忽略。比如excel的具有单元格的网格结构,但是单元格里面的数据格式是任意的
非结构化数据:没有内部结构,比如纯文本和图像数据
Web服务器日志:每次都会记录客户端主机的数据操作记录
MapReduce是一个可伸缩的编程模型。码农只需要写两个函数,map函数和reduce函数(每个函数定义了一个键值对集合到另一个键值对集合的映射),且无需关注数据集和所用集群的大小。举个例子,输入的数据量是原来的两倍,作业花费的时间也是两倍;但如果是花费的集群是原来的两倍,作业时间不变!
数据本地化:MapReduce会尽量在节点上存储数据,以实现数据的本地快速访问,它通过显式网络拓扑结构尽力保留网络带宽。
无共享:码农无需考虑系统部分失效问题,mapreduce会自动检测到失败的map和reduce任务并且让正常的机器去重新运行这些任务。
特点:
数据块:
每个磁盘都有默认的数据块大小,这是磁盘进行数据读写的最小单位。
文件系统一般为几千字节 ,磁盘块一般为512字节,HDFS默认为64MB。
为什么HDFS的块这么大?
块抽象的好处:
● 支持大规模文件存储。文件以块为单位进行存储,一个大规模 文件可以被分拆成若干个文件块,不同的文件块可以被分发到不同的 节点上,因此一个文件的大小不会受到单个节点的存储容量的限制,
可以远远大于网络中任意节点的存储容量。
● 简化系统设计。首先,大大简化了存储管理,因为文件块大小 是固定的,这样就可以很容易计算出一个节点可以存储多少文件块; 其次,方便了元数据的管理,元数据不需要和文件块一起存储,可以
由其他系统负责管理元数据。
● 适合数据备份。每个文件块都可以冗余存储到多个节点上,大
大提高了系统的容错性和可用性。
NameNode:
管理文件系统的命名空间,它维护着文件系统树及整棵树内所有的文件和目录
信息以两个文件形式永久保存在本地磁盘:命名空间镜像文件和编辑日志文件即FsImage和EditLog
记录着每个文件中各个块所在的数据节点的信息,但并不保存块的位置信息
名称节点在启动时,会将 FsImage 的内容加载到内存当中,然后执 行 EditLog 文件中的各项操作,使得内存中的元数据保持最新;名称节点启动成功并进入正常运行状态以后,HDFS中的更新操作 都会被写入到 EditLog,而不是直接写入 FsImage
客户端通过namenode和datanode的交互来访问整个文件系统
Secondary NameNode:
有效解决EditLog逐渐变大带来的问题,如果 EditLog很大,就会导致整个过程变得非常缓慢,使得名称节点在启动 过程中长期处于“安全模式”,无法正常对外提供写操作,影响了用户
的使用
所有的HDFS通信协议都是构建在TCP/IP协议基础 之上的
Client -》 NameNode :客户端通过一个可配置的端口向名称节点主动发起TCP连接, 并使用客户端协议与名称节点进行交互
NameNode -》 DataNode :使用数据节点协议进行交互
Client -》dataNode :客户端与数据节点的交互是通过 RPC(Remote Procedure Call)来实现的。在设计上,NameNode不会主动发起RPC,而是响应来自Client和DataNode的RPC请求
datanode:
文件系统的工作节点,它们根据需要存储并检索数据块(受客户端和namenode调度),并且定期向namenode发送它们所存储的块的列表
备份那些组成文件系统元数据持久状态的文件。namenode在多个文件系统上保存元数据的持久状态,实时同步的。另外,写入一个远程挂载的网络文件系统NFS
通常一个数据块的多个副本会被分布到不同的数据节点上
好处:
运行辅助的namenode,作用是定期通过编辑日志并合并命令空间镜像,以防止编辑日志过大。
客户端读取 HDFS的数据
客户端写 HDFS的数据
HDFS默认每个数据节点都是在不同的机架上
HDFS 默认的冗余复制因子是 3,每一个文件块会被同时保存到 3 个地方,其中,有两份副本放在同一个机架的不同机器上面,第三个 副本放在不同机架的机器上面,这样既可以保证机架发生异常时的数据恢复,也可以提高数据读写性能
读取优先级:
hdfs会对写入的文件计算循环冗余校验和,并在验证读取数据时进行验证
datanode负责验证收到的数据后存储数据以及其校验和
客户端从datanode读取数据时,也验证校验和,每个datanode都持久保存有一个用于验证的校验和日志
hdfs存储着每个数据块的副本,因此它可以通过复制完好的数据副本来修复损坏的数据块。
显示块信息:hadoop fsck / -files -blocks
本地文件复制到hdfs:hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://local/user/tom/quangle.txt
主机的url在core-site中指定
检查文件是否一致:md5 input/docs/quangle.txt quangle.txt
hadoop fs [genericOptions][commandOptions]
进入离开安全模式: hadoop dfsadmin -safemode get
hadoop dfsadmin -safemode wait/enter/leave
关于dfsadmin:
fsck工具:检测hadoop文件的健康状况 hadoop fsck /
开启均衡器:start-balance.sh
● hadoop fs -ls
● hadoop fs -ls -R
● hadoop fs -cat
出(stdout)。
● hadoop fs -chgrp [-R]group
个命令只适用于超级用户。
● hadoop fs -chown [-R][owner][: [group]]
只适用于超级用户。
● hadoop fs -chmod [-R]
更改为。这个命令只适用于超级用户和文件的所有者。
● hadoop fs -tail [-f]
容。
● hadoop fs -stat [format]
件的相关信息。当不指定format的时候,返回文件
● hadoop fs -touchz
● hadoop fs -mkdir [-p]。创建指定的一个或多个文件
夹,-p选项用于递归创建子文件夹。
● hadoop fs -copyFromLocal 。 将 本 地 源 文 件 复制到路径指定的文件或文件夹中。
● hadoop fs -copyToLocal [-ignorecrc][-crc]。将目 标文件复制到本地文件文件夹中,可用-ignorecrc选项复制CRC校验失败的文件,使用-crc选项复制文件以及CRC信息。
● hadoop fs -cp 。将文件从源路径复制到目标路径 。● hadoop fs -du
● hadoop fs -expunge。清空回收站,请参考HDFS官方文档以获取
更多关于回收站特性的信息。
● hadoop fs -get [-ignorecrc][-crc]。复制指定的 文件到本地文件系统指定的文件或文件夹,可用-ignorecrc选项复制CRC校验失败的文件,使用-crc选项复制文件以及CRC信息。
● hadoop fs -getmerge [-nl]。对指定的源目录中 的所有文件进行合并,写入指定的本地文件。-nl是可选的,用于指定在每个文件结尾添加一个换行符。
● hadoop fs -put 。从本地文件系统中复制 指定的单个或多个源文件到指定的目标文件系统中,也支持从标准输入(stdin)中读取输入写入目标文件系统。
● hadoop fs -moveFromLocal 。与put命令功能相同,但是文件上传结束后会从本地文件系统中删除指定的文件。
● hadoop fs -mv 。将文件从源路径移动到目标路 径。 ● hadoop fs -rm
● hadoop fs -rm -r
● hadoop fs -setrep [-R]
● hadoop fs -test -[ezd]
* -e检查文件是否存在,如果存在则返回0,否则返回1。
* -z检查文件是否是0字节,如果是则返回0,否则返回1。
* -d如果路径是个目录,则返回1,否则返回0。
● hadoop fs -text
作业历史包括已完成作业的事件和配置信息。
作业历史可以用来实现jobtrack本地文件系统的history子目录中,设置在hadoop.job.history.location,历史文件保存30天
运行期间,可以在作业页面监视作业进度,页面会定期更新。
摘要信息:
每个reducer产生一个输出文件,假设目录中有30个部分文件,命名为part-00000-part-00029
将部分文件合成为一个单独的文件:hadoop fs -getmerge temp temp-local
sort temp-local | tail
如果输出文件比较小,另外一种方式:hadoop fs -cat temp/*
提示:这里可以添加要学的内容
例如:
上传hdfs:
hadoop dfs -put test.jar /
在hdfs上运行jar:
hadoop jar test.jar test.test1Driver
hadoop jar 自己打的jar包名称/路径 运行程序的主类(Driver类)路径
如何将数据处理问题转化为MapReduce模型
例如:
用mapreduce来分解:
如何让作业顺序的执行?是否有一个线性的作业链或者一个更复杂的作业有向无环图
线性链表:
JobClient.runJob(conf1)
JobClient.runJob(conf2)
如果一个作业失败,runJob就会抛出一个IOException,这样一来管道中后面的作业就无法执行了。
更复杂的结构使用org.apache.hadoop.mapred.jobcontrol
一行代码就可以运行MapReduce作业:JobClient.runJob(conf)
整个过程:
Jobclient的runJob方法用于新建jobclient实例并且调用submitJob()的便捷方式。
提交作业后,runjob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告给控制台。作业完成后,如果成功,就显示作业计数器;如果失败,导致作业失败的错误被记录到控制台。
submitJob()所实现的作业提交过程如下:
当JobTracker接收到对其submitJob方法的调用后,会把此调用放入一个内部队列当中,交由作业调度器进行调度,并对其初始化。
创建任务运行列表:
在jobtracker为tasktracker选择任务之前,jobtracker必须选定任务所在的作业
如何选任务?(有调度算法)默认的方法是简单维护一个作业优先级列表,一旦选择好作业,jobtracker就可以为该作业选择一个任务。
对于map任务和reduce任务,tasktracker有固定数量的任务槽
Example:
tasktracker可能可以同时运行两个map和两个reduce。准确数量由tasktracker核的数量核内存大小决定。
默认:
假设tasktracker已经被分配了一个任务,下一步是运行该任务。
streaming和Pipes都运行特殊的map和reduce任务,目的是运行用户提供的可执行程序,并与之通信
Streaming中:
Pipes:
MapReduce作业是长时间运行的批量作业,运行时间范围从数秒到数小时。得知作业的进展是很重要的。
每个作业都有一个状态:
进度追踪
map任务:任务进度是已处理输入所占比例
reduce任务:情况有点复杂,但是系统仍然会估计已处理reduce任务的比例
MapReduce中进度的组成
进度并不是总是可测量的,但是无论如何Hadoop有个任务正在运行。比如输出记录的任务也可以表示成进度,尽管不是百分比的形式
进度的所有操作:
如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到tasktracker,有一个独立的线程,每隔三秒检查一次此标志,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒发送心跳到jobtracker(5秒是最小值),心跳间隔实际上是由集群的大小来决定的,对于一个更大的集群,间隔会长一些 tasktracker运行的所有任务的状态都会在调用中被发送至jobtracker,计数器间隔少于5秒,因为计数器占用的带宽相对较高
JobClient通过每秒查询jobtracker来接收最新状态。JobClient的getJob()方法来得到一个RunningJob的实例
jobtracker收到最后一个作业完成,便把作业状态设置为“成功”。JobClient查询状态时,便知道任务已经完成,JobClient打印一条消息告知用户,从runJob返回。
最后jobtracker清空工作的状态,指示tasktracker也清空工作状态。
目标:让每个用户公平地共享集群能力
。如果只有一个作业,它会得到集群的全部资源。随着提交的作业越来越多,空闲的任务会以公平共享集群。
如何使用?
MapReduce确保每个reducer输入都是按键排序的,map的输出传给给reduce作为输入,中间这个过程称为shuffle
map函数产生输出时,并不是简单的将它写入到磁盘!
这个过程有点复杂,先用缓冲的方式写入到内存,并出于效率的考虑进行预排序
过程要点:
io.sort.factor
mpred.map.output.compression.codec
为truereduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件,每个map任务完成的时间不一样,只要有一个map任务完成,reduce任务就开始复制map的输出文件(并行获取map输出,默认5个)
过程要点:
假设50个map文件,合并因子(merge factor
=10),那么合并进行5趟,每趟合并10个文件,最后形成10个中间文件
实际上没有这么完美的安排!!! 目标是合并最小数量的文件一遍满足最后一趟的合并系数
假设40个文件,运行起来绝对不是4 * 10的安排,而是第一天合并4个文件,随后3趟合并10个文件,在最后一趟(第4趟)4个已经合并的文件会和余下的6个合并,合计10个。总体上看好像是4 * 10
why?
这是优化措施,不改变合并次数的情况下,尽量减少写到磁盘的数据量,因为最后一趟无论如何都要直接合并到reduce(4和6顺带在最后一趟写入,少一次合并IO)
10+10+10+10 = 40 - > IO(40)
(4)+10+10+10 +( 6) = 40 - > IO(30)
原则:给shuffle过程分配尽量多的提高内存空间
问题:map函数和reduce函数能够得到足够的内存来运行(编写函数时少用内存)
运行map和reduce任务的JVM,内存大小在mapred.child.java.opts
属性设置
io.file.buffer.percent
中设置
MR模型将作业分解成任务,然后并行的运行任务使得作业的整体时间少于各个任务执行的时间
任务执行缓慢有很多原因,但是检测具体原因很困难,Hadoop不会去诊断和修复缓慢任务,相反,有个任务比预期的时间慢,它会尽量检测但同时它会启动另一个相同的任务作为备份。
策略:
推测执行的目的时减少作业的时间,但这是以集群的效率为代价的,推测执行会减少整个的吞吐量,因为冗余任务的执行会减少作业执行时间
Hadoop在自己的java虚拟机上运行任务,以区分其他正在运行的任务。每个任务启动一个新的JVM将耗时1秒。对于小任务1s微不足道,大量短map任务你就知道错了。
如果有大量短任务的或者初始化时间长的作业,能够对后续任务重用JVM就可以体现出性能上的优势。
启动任务重用JVM后,任务不会同时运行在一个JVM上。JVM顺序执行各个任务,但是tasktracker可以一次性执行多个任务,都是在独立的JVM内运行的。
我们可以让同一作业中的不同任务共享一个JVM,数量不限
共享JVM的另一个好处是,作业之间的状态共享,通过共享静态字段中存储的相关数据,任务可以快速访问共享数据
hadoop fs -text
诊断)执行环境的属性:
一般来说,map函数的输入的键值类型(k1,v1)相同于输出类型(k2,v2)
partition函数将中间的键值对(k2,v2)进行处理,并且返回一个分区索引(partition index),分区由键决定
partition : (k2,v2) -> interger
public interface Partition<K2,V2> extends JobConfigurable{
int getPartition(K2 key, V2 value , int numPartitions) ;
}
默认的输入格式是TextInputFormat
,它产生的键的类型是LongWritable(文件中每行中开始的偏移量),值的类型是Text文本行,最后输出的整数是行偏移量。
默认的mapper是IdentityMapper ,它将原封不动地写在输出中
public class IdentityMapper<k,v>extends MapReduceBase implments Mapper<k,v,k,v>{
public void map(K key ,V value,
OutputCollecctor <K,V> output , Reporter reporter)
throws IOException{
output.collect(key,value)
}
}
IdentityMapper是一个泛型类型,它可以接受任何键
默认的partitioner是HashPartitioner,它对记录的键进行哈希操作以决定该记录属于哪个分区。每个分区对应一个reducer任务,所以分区个数等于作业的reducer个数
public class HashPartitioner<k2,v2> implments Partition<k2,v2>{
public void configure(JobConf job){}
public int getPartition(K2 key , V2 value, int numPartitions){
return (key.hashcode()& Integer.Max_VALUE)%numPartitions;
}
}
键的哈希码被转换成一个非负整数,它由哈希值与最大的整形值做一个按位与操作而获得的
流数据,即数据以大 量、快速、时变的流形式持续到达
定义:
流数据(或数据流)是指在时间分布和数量上无 限的一系列动态数据集合体
数据记录是流数据的最小组成单元
流数据则不适合采用批量计算,因为流数据不适合用传统的关系 模型建模,不能把源源不断的流数据保存到数据库中
流计算一般包含3个阶段:
数据实时采集、数据实时计算、实时查询服务
传统:
用户查询(sql) --> DBMS --> 返回结果
数据采集系统:
① Agent:主动采集数据,并把数据推送到Collector部分。
② Collector:接收多个Agent的数据,并实现有序、可靠、高性能
的转发。
③ Store:存储Collector转发过来的数据。
在Storm对流数据Streams的抽象描述中,如图10-10所示,流数据是一个无限的Tuple序列.
这些Tuple序列会以分布式的方式并行地创建和处理
Storm认为每个Stream都有一个源头,并把这个源头抽象为Spouts。 Spouts会从外部读取流数据并持续发出Tuple
Storm将Streams的状态转换过程抽象为Bolts。 Bolts既可以处理Tuple,也可以将处理后的Tuple作为新的Streams发送给 其他Bolts。对Tuple的处理逻辑都被封装在Bolts中,可执行过滤、聚
合、查询等操作
Storm将Spouts和Bolts组成的网络抽象成Topology。
Topology是Storm中最高层次的抽象概念,它可以被提交到Storm集 群执行。
当Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该Stream的Bolt上进行处理
任务分配调度管理器?
Storm中的Stream Groupings用于告知Topology如何在两个组件间 (如Spout和Bolt之间,或者不同的Bolt之间)进行Tuple的传送
箭头表示Tuple的流向, 而圆圈则表示任务,每一个Spout和Bolt都可以有多个分布式任务,一 个任务在什么时候、以什么方式发送Tuple就是由Stream Groupings来决定的
六种方式:
① ShuffleGrouping:随机分组,随机分发Stream中的Tuple,保证每 个Bolt的Task接收Tuple数量大致一致。
② FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同
一个Task中。
③ AllGrouping:广播发送,每一个Task都会收到所有的Tuple。 ④ GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task
中。
⑤ NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执
行会和它的被订阅者在同一个线程中执行。
⑥ DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的
处理
工作流程(一张图就够了):
① 客户端提交Topology到Storm集群中。
② Nimbus将分配给Supervisor的任务写入Zookeeper。
③ Supervisor从Zookeeper中获取所分配的任务,并启动Worker进
程。
④ Worker进程执行具体的任务
负责在集 群范围内分发代码、为Worker分配任务和监测故障
问题:
将基于MapReduce的批量处理转为小批量处理,每隔一个周期就启动一次MapReduce作业,通过这样的方式来处理流数据是否可行,为什么?
1) 切分成小片段同时增加了任务处理的附加开销,并且需要处理片段之间的依赖关系;
2) 需要对MapReduce进行改造以支持流式处理,会大大增加MapReduce框架的复杂度,导致系统难以维护和扩展;
3) 降低了用户程序的可伸缩性,因为用户必须使用MapReduce接口定义流式作业。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。