赞
踩
HDFS实现目标:
HDFS局限:
数据块
每个磁盘都有默认的数据块大小,这是磁盘进行数据读写的最小单位。HDFS同样也有块(block)的概念,它的默认大小是128MB。HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。但与面向单一磁盘的文件系统不同的是,HDFS中小于一个块大小的文件不占据整个块的空间(比如,当一个1MB的文件存储在一个128MB的块中时,文件只使用1MB的磁盘空间,而不是128MB)。
块设计大一点是为了:
- 支持面向大规模数据存储,
- 降低分布式节点的寻址空间。
缺点:如果块过大会导致MapReduce就一两个任务在执行完全牺牲了MapReduce的并行度,发挥不了分布式并行处理的效果。
优点:
- 支持大规模文件存储,
- 简化系统设计,
- 适合数据备份。
HDFS中fsck指令可以显示块信息。
%hdfs fsck / -files -blocks
namenode 和 datanode
HDFS集群有两类节点–namenode和datanode。
namenode由两部分组成,FsImage和EditLog。
FsImage维护(保存)着文件系统树以及整棵树内所有的文件和目录;而EditLog记录对数据进行的各种操作(创建,删除和重命名等操作)。
对namenode实现容错非常重要,Hadoop为此提供了两种机制。
第一种机制是备份那些组成文件系统元数据持久状态的文件。
另一种可行的方法是运行一个secondarynamenode。它的主要作用是定期合并FsImage和EditLog,以防止编辑日志过大。它的另一个作用是实现对namenode冷备份。
块缓存
通常datanode从磁盘中读取块,但对于访问频繁的文件,其对应的块可能被显式地缓存在datanode的内存中,以堆外块缓存(off-heap block cache)的形式存在。默认情况下,一个块仅缓存在一个datanode的内存中,当然可以针对每个文件配置datanode的数量。用户或应用通过在缓存池(cache pool)中增加一个cache directive 来告诉namenode需要缓存哪些文件以及存多久,缓存池是一个用于管理权限和资源使用的管理性分组。
联邦HDFS
namenode在内存中保存文件系统中每个文件和每个数据块的引用关系,这意味着对于一个拥有大量文件的超大集群来说,内存将成为限制系统横向扩展的瓶颈。在2.x发行版本中引入的联邦HDFS允许系统通过添加namenode实现扩展,其中每个namenode管理文件系统命名空间中的一部分。在联邦环境下,每个namenode维护一个命名空间卷(namespace volume),由命名空间的元数据和一个数据块池(block pool)组成,数据块池包含该命名空间下文件的所有数据块。命令空间卷之间是相互独立的,两两之间并不互相通信,甚至其中一个namenode的失效也不会影响由其他namenode维护的命名空间的可用性。数据块池不再进行切分,因此集群中的datanode需要注册到每个namenode,并且存储着来自多个数据块池中的数据块。要是哪个访问联邦的HDFS集群,客户端需要使用客户端挂载数据表将文件路径映射到namenode,该功能可以通过ViewFileSystem和Viewfs://URI进行配置和管理。
HDFS的高可用性
为了更好地解决namenode单点故障问题,Hadoop2 针对上述问题增加了对HDFS高可用性(HA)的支持。在这一实现中,配置了一对活动-备用(active-standby)namenode。当活动namenode失效,备用namenode就会接管它的任务并开始服务于来自客户端的请求,不会有任何明显的中断。
可从两种高可用性共享存储做出选择:NFS过滤器或群体日志管理器(QJM,quorum journal manager)。QJM是一个专用的HDFS实现,为提供一个高可用的编辑日志而设计,被推荐用于大多数HDFS部署中。QJM以一组日志节点(journal node)的形式运行,每一次编辑必须写入多数日志节点。在活动namenode失效之后,备用namenode能够快速实现任务接管,因为最新的状态存储在内存中:包括最新的编辑日志和最新的数据块映射信息。实际观察到的失效时间略长一点,这是因为系统需要保守确定活动namenode是否真的失效了。
在活动namenode失效且备用namenode也失效的情况下,管理员依旧可以声明一个备用namenode并实现冷启动。
故障切换和规避
系统中有一个称为故障转移控制器(failover controller)的新实体,管理着将活动namenode转移为备用namenode的转换过程。每一个namenode运行着一个轻量级的故障转移控制器,其工作就是监视宿主namenode是否失效(通过一个简单的心跳机制实现)并在namenode失效时进行故障切换。
高可用实现了更进一步的优化,以确保先前活动的namenode不会执行危害系统并导致系统崩溃的操作,该方法称为“规避”(fencing)。
同一时间QJM仅允许一个namenode向编辑日志中写入数据。
文件系统的基本操作
我们可以执行所有常用的文件系统操作,如,读取,新建,移动,删除等。可以输入hadoop fs -help命令获取每个命令的详细帮助文件。
首先从本地文件系统将一个文件复制到HDFS:
hadoop fs -copyFromLocal input/docs/test.txt \ hdfs://localhost/user/test.txt
也可以省略hdfs://loclahost,因为在core-site.xml中指定了。
hadoop fs -copyFromLocal input/docs/test.txt /user/test.txt
也可以使用相对路径。
我们把文件复制回本地文件系统,并检查是否一致:
hadoop fs -copyToLocal /user/test.txt input/docs/test.txt
md5 input/docs/test.txt /user/test.txt
Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。
从Hadoop URL读取数据
这里采用的方法是通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory()方法。每个Java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其他组件已经声明了一个URLStreamHandlerFactory实例,你将无法使用这个方法从Hadoop中读取数据。
public class URLCat { //每个虚拟机只能调用一次FsUrlStreamHandlerFactory()方法,因此通常在静态方法中调用 static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) throws Exception{ // File path = new File("."); // System.out.println(path.getAbsolutePath()); //当前路径为 /Users/sunjianyang/Desktop/编程/hadoop/. InputStream in = null; try{ in = new URL("hdfs://129.*.*.*:9000/id_rsa.pub").openStream(); IOUtils.copyBytes(in,System.out,4096,false); }finally { IOUtils.closeStream(in); } } }
通过FileSystemAPI读取数据
public class FileSystemCat { public static void main(String[] args) { //第一种方法 // String uri = "hdfs://129.*.*.*:9000/id_rsa.pub"; // Configuration configuration = new Configuration(); // InputStream in = null; // try{ // FileSystem fs = FileSystem.get(URI.create(uri),configuration); // in = fs.open(new Path(uri)); // IOUtils.copyBytes(in,System.out,4096,false); // }catch (IOException e){ // e.printStackTrace(); // }finally { // IOUtils.closeStream(in); // } //第二种方法 // String uri = "hdfs://129.*.*.220:9000/id_rsa.pub"; // Configuration configuration = new Configuration(); // configuration.set("fs.defaultFS","hdfs://129.*.*.*:9000"); // InputStream in = null; // try{ // FileSystem fs = FileSystem.get(configuration); // in = fs.open(new Path(uri)); // IOUtils.copyBytes(in,System.out,4096,false); // }catch (IOException e){ // e.printStackTrace(); // }finally { // IOUtils.closeStream(in); // } //fs.open()返回的是FSDataInputSteam对象,支持seek()随机访问 String uri = "hdfs://129.*.*.*:9000/id_rsa.pub"; Configuration configuration = new Configuration(); FSDataInputStream in = null; try{ FileSystem fs = FileSystem.get(URI.create("hdfs://129.*.*.*:9000/id_rsa.pub"),configuration); FileStatus fileStatus = fs.getFileStatus(new Path("/id_rsa.pub")); in = fs.open(new Path(uri)); IOUtils.copyBytes(in,System.out,4096,false); in.seek(0);//定位到文件开始处 IOUtils.copyBytes(in,System.out,4096,false); System.out.println(fileStatus.getReplication()); }catch (IOException e){ e.printStackTrace(); }finally { IOUtils.closeStream(in); } } }
写入数据
public class FileCopyWithProgress { public static void main(String[] args) throws Exception{ File path = new File("."); System.out.println(path.getAbsolutePath()); String localDesination = "/Users/sunjianyang/Desktop/编程/hadoop/src/main/resources/test.txt"; String dst = "hdfs://129.*.*.220:9000/test.txt"; InputStream in = new BufferedInputStream(new FileInputStream(localDesination)); Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst),configuration); FSDataOutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.println("hello"); } }); System.out.println("the position: "+out.getPos()); IOUtils.copyBytes(in,out,4096,true); System.out.println("the position: "+out.getPos()); } }
每次Hadoop调用progress()方法时,也就是将64KB数据包写入datanode管线后,打印一个时间点来显式整个运行过程。
目录
public boolean mkdir(Path f) throws IOExpection
通常不需要显式创建一个目录,因为调用create()方法写入文件时会自动创建父目录。
查询文件系统
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.IOException; import java.net.URI; public class ListStatus { private final static String uri = "hdfs://129.*.*.220:9000/"; public static void main(String[] args) throws IOException { Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),configuration); //列出文件格式,不包含通配符 // Path[] path = new Path[]{"hdfs://129.211.82.220:9000/","hdfs://129.*.*.220:9000/user/hadoop/"}; // Path[] path = new Path[]{ // new Path("hdfs://129.*.*.220:9000/"), // new Path("hdfs://129.*.*.220:9000/user/hadoop/") // }; // // FileStatus[] status = fs.listStatus(path); // Path[] listedPaths = FileUtil.stat2Paths(status); // for(Path fileStatus: listedPaths){ // System.out.println(fileStatus); // } //包含通配符的筛选 Path path = new Path("hdfs://129.*.*.220:9000/*"); FileStatus[] status = fs.globStatus(path, new PathFilter() { //1. 在匿名内部类中直接初始化 // private final String regex = "^hdfs://129.*.*.220:9000/tm.*"; //2. 使用构造函数对变量进行初始化 private final String regex; //这是匿名内部类的构造器 { regex = "^hdfs://129.*.*.220:9000/tm.*"; } public boolean accept(Path path) { return !path.toString().matches(regex); } }); // FileStatus[] status = fs.globStatus(path,new RegexExcludePathFilter("hdfs://129.*.*.220:9000/tm.*")); Path[] listedPaths = FileUtil.stat2Paths(status); for(Path path1 : listedPaths) System.out.println(path1); // fs.delete(new Path("hdfs://129.*.*.220:9000/id_rsa.pub"),true); } } class RegexExcludePathFilter implements PathFilter{ private final String regex; public RegexExcludePathFilter(String regex){ this.regex=regex; } public boolean accept(Path path){ return !path.toString().matches(regex); } }
删除数据
public boolean delete(Path f,boolean recursive) throws IOException
如果f是一个文件或空目录,那么recursive的值就会被忽略,只有recursive值为true时,非空目录及其内容才会被删除。
文件读取
文件写入
一致写入
distcp可以并行从Hadoop文件系统中复制大量数据,也可以将大量数据复制到Hadoop中。
hadoop distcp file1 file2
hadoop distcp dir1 dir2
hadoop distcp -update dir1 dir2
其中 -update表示仅更新发生变化的文件,-overwrite在保持同样的目录结构的同时强制覆盖原有文件,-delete删除目标路径中任意没在源路径中出现的文件和目录,-p文件状态属性如权限、块大小和复本数被保留。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。