赞
踩
如果文件太大,那么一台机器肯定存不下,所以需要进行分块存储到不同的机器上。这就需要用到网络通信,同时保证文件不丢失。
Hadoop的HDFS则实现了分布式存储。
本章具体介绍HDFS,以及其他的存储系统(本地文件系统、Amazon S3系统)
HDFS以流数据访问模式来存储超大文件,运行于商业硬件集群上
下面具体解释上述句子中的各个词语的含义
(1)超大文件: MB,GB,TB甚至PB级别的文件。Hadoop都可以存储
(2)流式数据访问:
其基本思路为:一次写入、多次读取。所以读取更加频繁,所以读取的速度直接影响用户的体验。
(3)商用硬件:
Hadoop并不需要运行在昂贵的硬件上,只需要运行在商用硬件(普通硬件)上。
这因为如此,故障率会很高,所以需要容错机制,让用户感觉不到故障。
(4)非低时间延迟的数据访问:
HDFS不适合低延迟访问,无法在短时间(几十毫秒)内完成访问
对于低延迟访问,HBase是一个更好的选择
(5)大量的小文件:
namenode的花名册记录了该集群所有分块文件的信息。
因此集群中分块的数量由namenode文件系统的容量决定。
如果有100万个数据库,那么namenode至少需要300MB内存,如果是数十亿,那么就无法完成了。
(6)不支持多用户写入,任意修改文件
HDFS只允许单用户写入
HDFS只允许添加,不允许修改。
在操作系统中,每个磁盘都会有默认的块大小,是磁盘写入(读取)数据的最小单位。
HDFS也有块的概念,但是大得多,默认为128MB。将一个大文件分为若干块(chunk),作为独立的存储单元。
如果这个文件小于128MB,那么他是不会占用128MB的空间,只会是原来的大小。(如一个1MB的文件存储在128MB的块中,也是1MB)
使用块的好处一:任意的大文件都可以拆分成多个块,然后存储到不同的磁盘上。
使用块的好处二:使用固定大小的块,简化系统的设计,更容易管理和存储。
使用块的好处三:固定的块适合于备份提高容错能力。一般来说同一个块会存在3台不同的机器上
查看块信息的命令% hdfs fsck / -files -blocks
HDFS集群有两类节点,一个namenode(管理节点),多个datanode(工作节点)
namenode:管理文件系统的命名空间。 维护文件系统树,该树内的所有文件和目录。
namenode的系统信息以2个文件的形式保存,分别是(1)命名空间镜像文件(2)编辑日志文件
namenode记录每个文件的块信息,但并不永久保存,系统重新启动后就会重新构建。
namenode十分重要,如果namenode损坏,那么文件系统的所有文件将丢失。
客户端并不需要知道namenode是哪一个,根据接口就可以执行访问系统
datanode:文件系统的工作节点,根据需要存储和检索数据块,并定期向namenode发送当前节点的块信息。
Hadoop为namenode提供两种容错机制,防止namenode损坏。
(1)备份namenode,在多个文件系统中保存元信息。
(2)辅助namenode(secondaryNameNode),该辅助定期合并编辑日志与命名空间镜像两个文件,防止编辑日志过大。
该辅助一般在单独的计算机上运行。该辅助会保存命名空间的副本,并在namenode发送故障时启用。
但是,辅助的状态落后于namenode,所以namenode失效后难免会丢失部分数据。将远程挂载的网络文件系统(NFS)上的namenode信息复制到辅助namenode(secondaryNameNode)作为新的namenode运行
(3)使用高可用集群,在3.2.5 有详细的讨论
通常,datanode都是从磁盘读取文件
但是如果某文件访问频繁,则可以被缓存在datanode的内存中,以堆外存缓存的形式存在。
通常情况,一个块只会缓存在一个datanode内存中,也可以通过配置文件修改缓存在datanode的数量。
MapReduce、Spark等 可以通过缓存块,来提高运行速度。比如连接(join)就是一个很好的候选
用户通过缓存池(cache poole) 增加一个cache directive 来告诉namenode需要缓存哪些文件,以及缓存的时间。
namenode的内存中保存着元数据,这个数据指明系统中的文件以及对于的块。
这也意味着namenode的内存大小限制了集群的数量。
Hadoop2.x引入了联邦HDFS,将多个namenode组成一个联邦,联邦中的每个namenode管理命名空间的一部分,比如一个namenode可能管理/user下的所有文件,另一个namenode可能管理/share下的所有文件。
联邦中的每个namenode维护一个命名空间卷(namespace volume)。命名空间卷之间相互独立,互不通信。
每个命名空间卷由两部分组成(1)命名空间的元数据(2)一个数据块池(block pool)
数据池块不再切分,集群中的datanode需要注册到每个namenode,并且存储着来自多个数据池块中的数据块。换句话说,一个datanode由多个namenode管着,同样一个namenode管理多个datanode。
若要访问联邦HDFS集群,则需要使用客户端挂载数据将文件路径映射到namenode,通过ViewFileSystem和viewfs://URI 进行配置和管理
高可用HA之前存在的问题
通过备份namenode元数据和通过备用secondaryNameNode依然无法实现系统的高可用。
在上述情况,如果namenode失效了,那么所有的客户端,MapReduce都将停止。必须要重新配置一个namenode,再启动才行。该过程称为冷启动
重新配置namenode(冷启动)需要的过程(1)将命名空间导入内存(2)重新执行编辑日志(3)接收多个来自datanode的数据块报告,并退出安全模式。
HA高可用
针对上述问题,Hadoop2增加了对HDFS的高可用性(HA)。
在高可用中配置了active(活动)和standby(备用)两个namenode。当active的namenode失效,standby的namenode会立马接管。 这过程用户无法察觉到明显的中断。
HA在架构上的修改
(1)namenode之间的编辑日志时共享的,所以当standby的namenode接管后,直接读取共享的编辑日志,以实现namenode的状态同步。
(2)datanaode需要同时向两个namenode发送块信息。因为块信息存储在namenode的内存中
(3)客户端要使用特定的方式处理namenode失效问题,且该方式对用户是透明的
(4)standby的namenode同样包含辅助namenode(SecondaryNameNode)
(5)standy的namenode为active的namenode进行定期检查命名空间。
对于上述点(1)编辑日志的共享存储,可以有两种选择。一种是NFS过滤器,另一种是日志管理器(QJM)。QJM是用HDFS实现,并且专门为共享编辑日志而设计的。
对于QJM,以一组日志节点(journal)的形式运行,每次编辑都写入多个日志节点,这样就可以防止数据彻底丢失。QJM的实现没有使用Zookeeper,但是工作方式与Zookeeper类似。
备用namenode为什么快速接管
活动(active)的namenode失效后,备用(standby)的namenode能够在几十秒内接管任务。
因为最小的状态存储在内存中:包括最新的编辑日志条目和最新的数据块映射信息。
namenode失效后,备用并不会立马顶上,因为系统要确保namenode是否真的失效了。
若活动(active)的namenode和备用(standby)的namenode都失效后,依然可以声明一个namenode进行冷启动。
故障切换与规避
系统中有一个新实体,称为故障转移控制器(failover controller),管理着将活动namenode转移为备用namenode的过程
有多种故障转移控制器,但默认的是使用Zookeeper来确保有且仅有一个活动的namenode。
每个namenode都运行一个轻量级故障转移器,监视宿主namenode是否失效,并在失效的时候进行故障切换。
管理员可以手动发起故障转移,称之为”平稳的故障“,来致使namenode有序地切换角色。
当系统误以为namenode失效
如果当前网速非常慢,导致系统误以为namenode已经停止工作,从而引起故障转移。但之前的namenode并没有停止工作,高可用用规避的方式来确保运行的namenode不会危害系统。
同一时间QJM只允许一个namenode向编辑日志写入数据。为了防止之前活动的namenode继续运行,可以使用SSH杀死namenode的进程。
客户端的故障转移可以通过客户端类库实现透明处理。通过配置文件等信息来实现故障转移的控制。
通过命令行接口进一步认识HDFS
配置伪分布式配置文件
(1)属性一:fs.defaultFS 设置为 hdfs://localhost/
用户设置Hadoop默认文件系统。 HDFS的守护进程通过该属性确定namenode所在的位置和端口。
(2)属性二:dfs.replication 设置为1
Hadoop每一块的副本个数,默认为3。即一个数据块会被存储到3个datanode上。因为是伪分布式,所以只需要设置为1即可。
文件系统的基本操作
基本操作包括读取文件、新建目录、移动文件、删除数据、列出目录等。
通过hadoop fs -help
查看每个命令的详细帮助文档
(1)从本地文件复制到HDFS
hadoop fs -copyFromLocal 本地路径 \ hdfs路径
hadoop fs -copyFromLocal input/docs/quangle.txt \ hdfs://localhost/user/tom/quangle.txt
因为在core-site.xml中已经指定了hdfs://localhost 所以可以省略为
hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt
在hdfs上新建一个目录
hadoop fs -mkdir books
在hdfs上查看当前文件夹里的文件
hadoop fs -ls .
Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。
Java中定义抽象类 org.apache.hadoop.fs.FileSystem 定义了Hadoop文件系统客户端的接口,并且该抽象类有几个具体的实现。
文件系统 | URI方案 | Java实现(都在org.apache.hadoop包中) | 描述 |
---|---|---|---|
Local | file | fs.LocalFileSystem | 本地文件系统 |
HDFS | hdfs | hdfs.DistributedFileSystem | HDFS分布式文件系统 |
WebHDFS | Webhdfs | hdfs.web.WebHdfsFileSystem | 基于Http的文件系统 |
SecureWebHDFS | swebhdfs | fs.HarFileSystem | |
HAR | har | fs.HarFileSystem | |
View | viewfs | viewfs.ViewFileSystem | |
FTP | ftp | fs.ftp.FTPFileSystem | |
S3 | S3a | fs.s3a.S3AFileSystem | |
Azure | wasb | fs.azure.NativeAzureFileSystem | |
Swift | swift | fs.swift.snative.SwiftNativeFileSystem |
列出本地文件系统根目录下的文件,可以使用命令
hadoop fs -ls file:///
接口
Hadoop是用Java写的,可以通过JavaAPI调用Hadoop文件系统的操作。
文件系统的命令就是一个Java应用。
Java使用FileSystem类来提供文件系统操作
除了使用Java外,还可以使用其他接口来访问文件系统,接下来具体介绍其他接口。
1、 HTTP
由WebHDFS协议提供了HTTP REST API来实现与HDFS进行交互。
由于HTTP接口比原生Java要慢,不到万不得已不要用来传输大文件。
通过HTTP访问HDFS有两种方法
(1)直接访问
(2)通过代理进行访问
两种方法都使用了WebHDFS协议
对于第(1)种方法,文件元数据由namenode管理,文件读写会首先发往namenode,由namenode重定向到datanode,再写入datanode
对于第(2)种方法,由于使用了代理,所以不直接访问namenode和datanode。使用代理可以用防火墙和带宽限制等策略,从而更加的安全。
2、C语言
hadoop提供了一个名为libhdfs的C语言类库。
这个C语言API与Java的API非常类似,但开发滞后于Java API,因此有一些新的特性还不支持。
可以通过include头文件hdfs.h
3、NFS
使用Hadoop的NFSv3网关将HDFS挂载为本地客户端的文件系统。
然后就可以使用Unix的程序(ls或者cat)与文件系统进行交互
4、FUSE
用户空间文件系统(FUSE,Filesystem in Userspace)允许用户空间实现的文件系统作为Unix文件系统进行基础。
深入探索Hadoop的Filesystem抽象类。主要聚焦于HDFS实例,即DistributedFileSystem实现鳄梨。
从Hadoop文件系统读取文件。使用java.net.URL对象打开数据流
InputStream in = null;
try{
in = new URL("hdfs://host/path").openStream();//打开数据流
}finally{
// 出现异常,则关闭数据流
IOUtils.closeStream(in);
}
用Java实现类似于Linux中的cat命令
public class URLCat{ static{ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception{ InputStream in = null; try{ in = new URL(args[0]).openStream();//打开数据流 //将结果复制到System.out,就是输出到界面。 4096表示缓冲区大小, false表示结束后不关闭数据流 IOUtils.copyBytes(in,System.out,4096,false); }finally{ IOUtils.closeStream(in);//关闭 } } }
使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
public class FileSystemCat{ public static void main(String[] args) throws Exception{ String uri = args[0]; Configuration conf = new Configuration();//获得conf类 FileSystem fs = FileSystem.get(URI.create(uri),conf); InputStream in = null; try{ in = fs.open(new Path(uri)); //将结果复制到System.out,就是输出到界面。 4096表示缓冲区大小, false表示结束后不关闭数据流 IOUtils.copyBytes(in,System.out,4096,false); }finally{ IOUtils.closeStream(in); } } } // 执行命令 // hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
FSDataImputStream对象
FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。 这个类继承了java.io.DataInputStream的一个特殊类,并支持随机访问。
public class FileSystemDoubleCat{ public static void main(String[] args) throws Exception{ String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); FSDataInputStream in = null; try{ in = fs.open(new Path(uri)); IOUtils.copyBytes(in,System.out,4096,false); in.seek(0);//返回到文件的开头 IOUtils.copyBytes(in,System.out.4096,false); }finally{ IOUtils.closeStream(in); } } } // 执行命令 // hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
将本地文件复制到Hadoop文件系统
public class FileCopyWithProgress{ public static void main(String[] args) throws Exception{ String localStr = args[0];//原始地址 String dst = args[1];// 目标地址 InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); //定义输入流 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst),conf); OutputStream out = fs.create(new Path(dst), new Progressable(){ public void progress(){ System.out.println(".") } });// 定义输出流,progress是为了显示进度 // 将输入流的数据复制到输出流 IOUtils.copyBytes(in,out,4096,true); } }
Filesystem实例提供了创建目录的方法
public boolean mkdirs(Path f) throws IOException
展示文件状态信息
public class ShowFileStatusTest{ // 定义一个集群类 private MiniDFSCluster cluster; private FileSystem fs; @Before public void setUp throws IOException{ Configuration conf = new Configuration(); if(System.getProperty("test.build.data") == null){ System.setProperty("test.build.data","/tmp"); } cluster = new MiniDFSCluster.Builder(conf).build(); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close; } @After public void tearDown() throws IOException{ if (fs !=null){ fs.close(); } if (cluster != null){ cluster.shutdown(); } } @Test public void fileStatusForfile() throws IOException{ Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().toUri().getPath(),is("/dir/file")) } }
本章具体介绍了HDFS、namenode和datanode之间的数据流是什么样的。
(1)客户端通过调用FileSystem对象的open()方法打开文件,该对象HDFS的具体实现类为DistributedFileSystem
(2)DistributedFileSystem通过远程过程调用(RPC)来调用namenode
(3)namenode返回该文件所有块的datanode地址,因为一个块有多个副本,所以其返回的规则是就近原则(返回距离近的datanode)。
(4)如果namenode本身也是datanode,就将其自身返回
(5)返回的datanode地址被封装进FSDataInputStream对象,再封装进DFSInputStream,返回给客户端。
(6)客户端接收到第一个datanode的地址后,调用read()方法读取具体信息。读取完毕后,再请求下一个datanode地址,循环至全部读取完毕。
(7)如果客户端在与datanode连接出现错误,则会去连接另一个最近邻的datanode,并且将之前故障的datanode记录,将损坏记录告诉datanode。
注! 正是因为namenode只告诉客户端datanode地址,而不是具体的内容,才能保证namenode能够接收更多客户端的连接。
(1)客户端通过对DistributedFileSystem对象调用create()方法来创建文件。
(2)DistributedFileSystem通过RPC调用,申请在namenode创建一个新的文件。
(3)namenode会执行这种检查,包括文件是否存在,客户端是否有权限。检查通过后则创建一条成功的记录,否则就抛出IOException异常。
(4)DistributedFileSystem给客户端返回一个FSDataOutputStream对象,客户端开始写入数据
(5)同样,FSDataOutputStream封装进DFSOutputStream对象,该对象负责datanode和namenode的通信。
(6)DFSOutputStream将数据分成一个一个数据包,并放入队列中。
(7)DataStreamer处理数据队列,挑选适合存储的一组datanode(组内的datanode数量由副本数决定)。
(8)若副本数设置为3,则会先写入第1个datanode,然后第1个datanode发送给第2个datanode,第2个datanode发送给第3个datanode。
(9)一组中的每个datanode都会将写入成功的结果返回给客户端(DFSOutputStream)
(10)若写入的时候发生故障,首先关闭管线,把还没有写入的数据包都返回给正常datanode指定的新标识中,然后告诉namenode发送故障的地点,namenode则删除故障节点的残余数据。删除后,正常的datanode沿着另一条管线继续走。
(11)当设置dfs.namenode.replication.min=1时,只要写入1个datnode就算写入成功,剩下的两个则由namenode自己复制。
(12)写入完成后,调用close()方法。该操作将剩余的所有数据写入datanode管线,并告知namenode这些datanode的地址。namenode等最小量(min)写入成功后,就由namenode进行复制。
一致模型保证文件读和写的可见性。
但是在HDFS中为了性能,牺牲了一部分一致性模型。
在HDFS中写入的文件,并非立马就能看见。
所以,HDFS提供了刷缓存(hflush)的方法,缓存刷新后,写入的数据就可以看见了。
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(),is(((long) "content".length())));
hflush()不保证datanode在磁盘中,而是保证在datanode内存中。可以使用hsync()替代。
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush();
out.getFD().sync();
assertThat(localFile.length(),is(((long) "content".length())));
Hadoop自带的应用程序distcp。
distcp程序可以并行从Hadoop文件系统复制大量数据,也可以将数据复制到Hadoop中。
复制文件
% hadoop distcp file1 file2
复制目录
% hadoop distcp dir1 dir2
更新目录
%hadoop distcp -update dir1 dir2
distcp是一个MapReduce程序,该复制作业是通过集群并行的map完成,并没有reducer。
保持HDFS集群的均衡
向HDFS复制数据时,考虑集群的均衡性也是相当重要的。
如果只由一个map来执行复制作业,那么一个map会把单一的节点磁盘给塞满。就无法达到均衡。
所以设定多个map可以避免这个问题,默认是使用20个map来执行distcp命令。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。