赞
踩
导语
Hadoop中附带了一个HDFS(Hadoop分布式文件系统)的分布式文件系统,专门用来存储超级大文件使用,它为整个的Hadoop应用生态圈提供了基础的文件存储功能。
HDFS专门是为了解决大数据存储问题而出现的,它具备如下的几个特点
1、HDFS文件系统可以存储超大文件
在我们实际应用中,每个磁盘都有自己默认的数据块的大小,这也是磁盘对数据读写的时候要求的最小的单位,文件系统就是建立在磁盘上的,而文件系统也是有逻辑块的概念。通常就是磁盘块的整数倍数,通常的文件系统一般是几千个字节,而磁盘块一般是512个字节。
HDFS是一种文件系统,自身也有块(block)的概念,它的文件也要比普通的单一磁盘上的文件系统要大的多,默认是64MB。
HDFS上的块之所以设置的这么大,就是为了解决文件寻址的开销。
HDFS文件的大小可以大于任意网络中的一个磁盘的容量,文件的所有内容块也不是指存储在一个磁盘上,所以可以利用集群上任意一个磁盘进行存储,也是由于这种分布式的存储逻辑,所以整个的HDFS可以存储的文件量级一般在G 、T、P的级别。
2、一次写入,多次读取
一个文件经过创建、写入和关闭之后不需要改变,也是有了这个假设之后,简化了数据一致性的问题,同时也可以在一定程度上提高了数据访问的吞吐量。
3、运行在普通廉价的机器上
Hadoop的设计本身就对硬件设施的要求比较低,不需要在昂贵的机器上进行运行,在HDFS设计中也是考虑到了数据的可靠性、安全性和高可用性能。
1、低延迟
HDFS不适用于实时查询这种对延迟数据要求很高的场景:例如实时大盘数据。对于低延迟访问场景则需要通过数据库访问索引的方案来解决会比较好一点,Hadoop生态中,Hbase具有随机读取、低延迟的特点。
2、大量小文件
对于Hadoop系统,小文件通常定义为远远小于HDFS的block size(64MB)的文件。由于每个文件都会产生各自的MetaData元数据,Hadoop通过Namenode来存储这些信息,如果文件太小,就会出现Namenode的瓶颈。
3、多用户更新
为了保证并发性能,HDFS需要一次性写入多次读取的场景,对于多用户写入的,如果需要修改,也是通过追加的方式添加的文件末尾处,出现太多文件需要更新的情况,Hadoop的是不支持。
如果有多写入的场景可以考虑Hbase的方案。
4、结构化数据
HDFS适合存储半结构化和非结构化的数据,如果需要有严格的结构化场景的话,就需要使用Hbase的解决方案。
5、数量并不太大
通常Hadoop适用于TB、PB数据,如果需要处理的数据只有几十个GB的话,就不需要使用Hadoop了,这种有点资源浪费,或者说没有任何的优势,反而增加了系统的消耗。
HDFS 是一个主从架构体系,由于分布式存储的特性,集群有两种节点类型NameNode和DataNode。
NameNode(名称节点):系统中通常只有一个,中心服务器的角色,管理存储和检索多个DataNode的实际数据所需要的元数据。
DataNode(数据节点):系统中通常是有多个这样的节点,是文件系统中真正存储数据的地方,在NameNode的统一调度下进行数据块的创建、删除和复制。
图中Client是HDFS的客户端,是应用程序可以通过这个模块与NameNode和DataNode进行交互,进行文件的读写操作。
为了系统容错性,HDFS文件系统会对所有的数据进行副本复制的方式进行操作,Hadoop默认的是通过3个副本的方式来解决容错性。
复制管理策略是运行客户端的节点上放一个复本,第二个复本会放到与第一个不同且随机另外选择的机架上的节点中,第三个复本与第二个复本放在相同机架,且随机选择的一个节点上。所有的复本都是通过随机的方式进行存储的,这样就不会出现多个复本在同一个机器或者机架上,这样可以保证在物理损坏的时候,也可以进行数据恢复。
所有的有关复制的决策统一是由NameNode负责,NameNode会周期性的接受集群中数据节点DataNode的心跳和快的报告。一个心跳表示这个节点是正常的,一个快报告包括该数据节点上的所有块的列表信息。
1、文件读流程
首先Client通过File System 的Open函数打开文件,Distributed File System通过RPC调用到NameNode节点,得到文件的数据块信息。
对于每一个数据块,NameNode节点返回保存数据块的数据节点的地址,Distributed File System返回 FSDataInputStream给到客户端,用来读取数据,客户端调用stream的read()函数开始数据的读取操作。
DFSInputStream连接保存文件第一个数据块的最近的数据节点,DataNode从数据节点读取到客户端client,当这个数据块读取完成之后,DFSInputStream关闭和这个数据节点的连接,然后连接到这个文件下一个数据块最近的数据节点。当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。
2.文件写入过程
客户端调用create()来创建文件,Distributed File System 用RPC调用NameNode节点,在文件系统的命名空间中创建一个新的文件,NameNode节点首先确定文件原来是不是已经存在了,并且客户端是否有创建文件的权限,然后才能创建新的文件。
Distributed File System 返回DFSOutputStream,用来客户端写入数据。DFSOutputStream将数据分成块,写入到Data Queue.Data Queue 是通过Data Streamer读取,并且通知到NameNode 节点分配数据节点。用来存储数据块。分配的数据节点放在一个Pipeline里面。Data Streamer将数据块写入Pipeline中的第一个数据节点,第一个数据节点将数据块发送到第二个数据节点,第二个数据节点将数据块发送到第三个节点。
DFSOutputstream 为发出去的数据块保存了Ack Queue,等待Pipeline中的数据节点告知,然后数据写入成功。
HDFS的文件读取解析编辑 播报
文件内容读取的代码可以分为三个大步骤 。
1、获取文件系统
2、通过文件系统打开文件
3、将文件内容输出
public static void read(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem(); //步骤 1
FSDataInputStream fsDataInputStream = hdfs.open(path); //步骤 2
IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false); //步骤 3
}
接下来,我们来看一下每个步骤的详细过程
获取文件系统对象
要从HDFS上读取文件,必须先得到一个FileSystem。HDFS本身就是一个文件系统,所以,我们得到一个文件系统后就可以对HDFS进行相关操作。获取文件系统的步骤可以分为以下2步。
1、读取配置文件。
2、获取文件系统。
读取配置文件:Configuration类有三个构造器,无参数的构造器表示直接加载默认资源,也可以指定一个boolean参数来关闭加载默认值,或直接使用另外一个Configuration对象来初始化。
package com.yq.common; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; public class HdfsUtils { public static FileSystem getFilesystem(){ FileSystem hdfs=null; Configuration conf=new Configuration(); try{ URI uri = new URI("hdfs://localhost:9000"); hdfs = FileSystem.get(uri,conf); } catch(Exception ex){ // } return hdfs; } }
打开文件
FSDataInputStream fsDataInputStream = hdfs.open(path);
打开文件其实就是创建一个文件输入流,跟踪文件系统的open方法,可以找到源码
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
再跟踪open方法,找到以下抽象方法。
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; //这个方法在DistributedFileSystem类有实现,如下 @Override public FSDataInputStream open(Path f, final int bufferSize) throws IOException { statistics.incrementReadOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataInputStream>() { @Override public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException { return new HdfsDataInputStream( dfs.open(getPathName(p), bufferSize, verifyChecksum)); } @Override public FSDataInputStream next(final FileSystem fs, final Path p) throws IOException { return fs.open(p, bufferSize); } }.resolve(this, absF); }
在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法。doCall方法和next方法都在resolve方法里用到了,而next方法只是在resolve方法异常捕获时才调用。
跟踪doCall方法,doCall方法里的open()方法有3个参数,src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和,的源代码如下。
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(this, src, buffersize, verifyChecksum);
}
checkOpen方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常(FileSystemclosed)。
然后返回一个分布式文件系统输入流(DFSInputStream),此处调用的构造方法源代码如下。
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
这个方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取已打开的文件信息。其源代码如下。
/** * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException, UnresolvedLinkException { lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. Lets // retry for 3 times to get the length. if (lastBlockBeingWrittenLength == -1) { DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); } else { break; } retriesForLastBlockLength--; } if (retriesForLastBlockLength == 0) { throw new IOException("Could not obtain the last block locations."); } }
此方法有调用fetchLocatedBlocksAndGetLastBlockLength()方法获取块的位置信息。
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); } if (newInfo == null) { throw new IOException("Cannot open filename " + src); } if (locatedBlocks != null) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } } locatedBlocks = newInfo; long lastBlockBeingWrittenLength = 0; if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); if (last != null) { if (last.getLocations().length == 0) { if (last.getBlockSize() == 0) { // if the length is zero, then no data has been written to // datanode. So no need to wait for the locations. return 0; } return -1; } final long len = readBlockLength(last); last.getBlock().setNumBytes(len); lastBlockBeingWrittenLength = len; } } currentNode = null; return lastBlockBeingWrittenLength; }
getLocatedBlocks方法可以获取块的位置信息。LocatedBlocks类是许多块的位置信息的集合。因为从此类的源码可以发现有这个一个私有属性:
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
通过文件名,FSDataInputStream类可以获取相文件内容,也可以充当namenode与datanode桥梁。
将文件内容在标准输出显示
因为之前已经获得了一个FSDataInputStream,所以,我们可以调用方法copyBytes将FSDataInputStream拷贝到标准输出流System.out显示。
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { try { copyBytes(in, out, buffSize); if(close) { out.close(); out = null; in.close(); in = null; } } finally { if(close) { closeStream(out); closeStream(in); } } }
此方法里又调用了另外一个copyBytes方法,作用同样是从一个流拷贝到另外一个流。
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
先从输入流中读取buffSize大小的数据到缓冲里面,然后将缓冲里的数据写入到输出流out里。一直循环,直到从输入流中读到缓冲里的字节长度为0,表示输入流里的数据已经读取完毕。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。