Write操作是HDFS最基本的操作之一,一般是先create一个文件然后write内容。这篇文章主要讲用户调用FileSystem APT向HDFS写入数据时,HDFS是怎么工作的。
1,client发起create请求。
通过DistributedFileSystem.create()方法创建文件,其实就是通过DistributedFileSystem内部wrapper的DFSClient向NameNode发送RPC请求。因为所有的元数据相关的FileSystem API都是转嫁到DFSClient上然后通过与NameNode的RPC完成的。
create操作生成了一个DFSOutputStream对象,而构造这个对象时首先是向NameNode发送了create RPC,然后构造并启动了DataStreamer线程,最后开启Lease定时Renew机制LeaseRenewer。
2,NameNode处理create请求,然后给client发送response。
Create RPC到达NameNode之后依次走过NameNodeRpcServer.create()->FSNamesystem.startFile()->startFileInt()->startFileInternal()
3,client向NameNode请求分配Block,并向多个DataNode建立pipeline传输数据。
HDFS文件以一个一个Block的形式写入HDFS,Client首先向NameNode发送addBlock RPC请求,NameNode返回一个LocatedBlock对象包含该Block的元数据信息和对应的三副本所在的DataNode。然后client向pipelien中的第一个DataNode发起TCP连接。
NameNode接到client端发送的addBlock RPC之后主要做了一下几件事:
(1) 确认该client是否持有该文件的lease。
(2) Commit上一个Block,同时把这个Block的元数据持久化到EditLog,Renew lease。
(3) 通过BlockManage分配相应副本数目的DataNode用以存放该块Block。
(4) 给client端返回LocatedBlock。
HDFS的每个Block的construction是由write pipeline完成的。HDFS的pipeline有不同的阶段,通过BlockConstructionStage来表示。开始的时候是BlockConstructionStage.PIPELINE_SETUP_CREATE阶段。数据是以packet为单位被push到pipeline中。在没有出错的情况下Block construction经历了三个阶段(如下图所示):setup,streaming,close。注意到在streaming阶段(也就是t1-t2这段时间内)packet的发送并不是收到上一个packet的ack之后才发送下一个packet,而是以一种滑动窗口的形式发送的。在最后的close阶段就要等到这个Block内所有packet的ack全部收到才能close这个pipeline了。
Write主要是通过DFSOutputStream完成的。DFSOutputStream是write操作的核心类,该类里面有两个内部类Packet和DataStreamer,同时该类继承自FSOutputSummer。
FSOutputSummer这个抽象类包装了与checksum相关操作的方法,所以DFSOutputStream这个输出流在写数据的同时会写checksum。DFSOutputStream接收数据是通过FSOutputSummer的write方法。
数据传输的单位是由内部类Packet定义的,一个Packet大小是64K,是由512bytes的chunk组成的,一个chunk对应一个checksum。而另一个内部类DataStreamer是向外写数据的操作逻辑,是整个write操作的核心。
下面重点介绍DataStreamer类。DataStreamer.run()方法:
数据以默认64M的block为单位传输,也就是说 只支持client端串行写,这个和GFS不一样,GFS支持atomic record append,也就是多个client对同一个文件进行追加写。因为GFS是通过其中的一个ChunkServer(DataNode)来coordinate各个record有序的。HDFS是通过建立pipeline来完成数据传输的。
在client端有两个队列dataQueue和ackQueue。dataQueue是待发送的数据包packet临时存放的地方,ackQueue是已经发送还没有收到ack的数据包临时存放的地方。很多操作是需要对这两个队列上锁(synchronized)的。整个写数据的过程是以Block为单位的,整个过程随着BlockConstructionStage的变化而变化。一个Block写完,重新建立新的pipeline,写入新的Block。
- public void run() {
- long lastPacket = Time.now();
- while (!streamerClosed && dfsClient.clientRunning) {
-
- // if the Responder encountered an error, shutdown Responder
- if (hasError && response != null) {
- try {
- response.close();
- response.join();
- response = null;
- } catch (InterruptedException e) {
- DFSClient.LOG.warn("Caught exception ", e);
- }
- }
-
- Packet one = null;
-
- try {
- // process datanode IO errors if any
- boolean doSleep = false;
- if (hasError && errorIndex>=0) {
- doSleep = processDatanodeError();
- }
-
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- long now = Time.now();
- while ((!streamerClosed && !hasError && dfsClient.clientRunning
- && dataQueue.size() == 0 &&
- (stage != BlockConstructionStage.DATA_STREAMING ||
- stage == BlockConstructionStage.DATA_STREAMING &&
- now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
- //dataQueue还没有数据,同时离pipeline中下游节点的读超时还远,那就先休息会吧。
- long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
- timeout = timeout <= 0 ? 1000 : timeout;
- timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
- timeout : 1000;
- try {
- dataQueue.wait(timeout);
- } catch (InterruptedException e) {
- DFSClient.LOG.warn("Caught exception ", e);
- }
- doSleep = false;
- now = Time.now();
- }
- //任何时候发现DataStreamer挂了,或者pipeline中出错了,或者这个client被关闭了,那么后面直接跳过,回到循环开始的时候处理错误。
- if (streamerClosed || hasError || !dfsClient.clientRunning) {
- continue;
- }
- // get packet to be sent.
- if (dataQueue.isEmpty()) {
- one = new Packet(checksum.getChecksumSize()); // heartbeat packet
- } else {
- one = dataQueue.getFirst(); // regular data packet
- }
- }
- assert one != null;
-
- // get new block from namenode.
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Allocating new block");
- }
- //如果是create文件,那么先向NameNode发送addBlock RPC,返回LocatedBlock中包含了三个DataNodeInfo;然后通过createBlockOutputStream()这个函数向第一个DN发送写命令,通过DN传导建立起pipeline。
- nodes = nextBlockOutputStream(src);
- //生成一个ResponseProcessor负责处理pipeline的response。一个64M的Block对应一个ResponseProcessor,而且所有Block也是串行传输的。然后pipeline进入DATA_STREAMING阶段。
- initDataStreaming();
- } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
- if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Append to block " + block);
- }
- setupPipelineForAppendOrRecovery();
- initDataStreaming();
- }
- //获取current packet的最后一字节在整个Block中的offset
- long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
- if (lastByteOffsetInBlock > blockSize) {
- throw new IOException("BlockSize " + blockSize +
- " is smaller than data size. " +
- " Offset of packet in block " +
- lastByteOffsetInBlock +
- " Aborting file " + src);
- }
-
- if (one.lastPacketInBlock) {
- // wait for all data packets have been successfully acked
- synchronized (dataQueue) {
- while (!streamerClosed && !hasError &&
- ackQueue.size() != 0 && dfsClient.clientRunning) {
- //如果ackQueue中还有待确认的packet,则循环等待直到这个Block中所有的packet都被确认了才退出循环。
- try {
- // wait for acks to arrive from datanodes
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
- DFSClient.LOG.warn("Caught exception ", e);
- }
- }
- }
- if (streamerClosed || hasError || !dfsClient.clientRunning) {
- continue;
- }
- //这个Block中所有的packet都被ack了,那么这个BlockConstructionStage就发生变化了。一个Block对应一个pipeline,所以写Block的过程是从PIPELINE_CREATE到PIPELINE_CLOSE
- stage = BlockConstructionStage.PIPELINE_CLOSE;
- }
-
- // send the packet
- synchronized (dataQueue) {
- // move packet from dataQueue to ackQueue
- if (!one.isHeartbeatPacket()) {
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
- }
- }
-
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DataStreamer block " + block +
- " sending packet " + one);
- }
-
- //向刚才建立起来的pipeline中写packet
- try {
- one.writeTo(blockStream);
- blockStream.flush();
- } catch (IOException e) {
- errorIndex = 0;
- throw e;
- }
- lastPacket = Time.now();
-
- if (one.isHeartbeatPacket()) { //heartbeat packet
- }
-
- // update bytesSent
- long tmpBytesSent = one.getLastByteOffsetBlock();
- if (bytesSent < tmpBytesSent) {
- bytesSent = tmpBytesSent;
- }
-
- if (streamerClosed || hasError || !dfsClient.clientRunning) {
- continue;
- }
-
- // Is this block full?
- if (one.lastPacketInBlock) {
- // wait for the close packet has been acked
- synchronized (dataQueue) {
- while (!streamerClosed && !hasError &&
- ackQueue.size() != 0 && dfsClient.clientRunning) {
- dataQueue.wait(1000);// wait for acks to arrive from datanodes
- }
- }
- if (streamerClosed || hasError || !dfsClient.clientRunning) {
- continue;
- }
- //关闭ResponseProcessor,关闭pipeline中的输入输出流
- endBlock();
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && dfsClient.clientRunning) {
- Thread.sleep(artificialSlowdown);
- }
- } catch (Throwable e) {
- DFSClient.LOG.warn("DataStreamer Exception", e);
- if (e instanceof IOException) {
- setLastException((IOException)e);
- }
- hasError = true;
- if (errorIndex == -1) { // not a datanode error
- streamerClosed = true;
- }
- }
- }
- closeInternal();
4,处在pipeline中的DataNode处理来自upstream的写操作。
在DataNode中,块数据的接收和发送主要是通过DataXceiver,这部分的网络传输没有采用RPC,而是传输的TCP连接。每个DataNode节点都会启动一个DataXceiverServer和多个DataXceiver,DataXceiverServer相当于TCP socket编程中的Listen线程,而DataXceiver相当于handle线程。
在DataXceiver的run()方法中,从pipeline的上游socket中读取操作内容(函数readOp()),然后处理这个操作(processOp())。对于WRITE_BLOCK这种操作,会调用DataXceiver.writeBlock()函数。这个函数做了如下几件大事:
(1)建立起与pipeline下游的TCP socket连接;
(2)给pipeline上游发送ack;
(3)生成BlockReceiver并通过BlockReceiver.receiveBlock()接收来自pipeline上游的数据,把数据发送给下游并存储在本地磁盘上。BlockReceiver内部包含有到downstream DN和本地磁盘的输出流。
这个函数首先生成PacketResponder接收并处理下游返回的response,同时将下游的ack转发给上游;然后通过循环调用receivePacket()这个函数接收数据包,发送到downstream DN并把packet持久化到本地磁盘(生成Block文件和meta文件)。
关于这三个步骤执行的并发性和先后顺序在HDFS-append之后变得异常复杂,以后有机会再单独阐述吧。
当没有数据传输时,pipeline中各datanode通过heartbeat对连接keepalive。在整个pipeline(包括client和多个DataNode)的各台机器中用于互相通信的socket都配置了两个timeout值,分别是confTime和socketTimeout。confTime的值是WRITE_TIMEOUT,而socketTimeout的值是READ_TIMEOUT,即同一个socket的读写timeout是不同的。这样在整个pipeline的timeout值都是一致的了。
- intgetDatanodeWriteTimeout(intnumNodes) {
-
- return(dfsClientConf.confTime > 0) ?
-
- (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
-
- }
-
- intgetDatanodeReadTimeout(intnumNodes) {
-
- returndfsClientConf.socketTimeout > 0?
-
- (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
-
- dfsClientConf.socketTimeout) : 0;
-
- }
与write相关还有个重要的问题就是flush/sync问题,而且这个问题随着HDFS对append的支持而越发复杂。DFSOutputStream继承自FSOutputSummer,DFSOutputStream接收数据是通过FSOutputSummer的write方法,这个方法是synchronized。而sync方法的flushBuffer()和enqueuePacket()也是synchronized。也就是说对一个DFSOutputStream线程,如果sync和write同时调用将发生同步等待。常见的情况就是在HBase中写HLog时,每次有数据写入就要sync。(HLog中有个logSyncer,默认配置是每秒钟调用一次sync,不管有没有数据写入)sync发生的频率非常高,sync抢到锁的可能性很大。这样在不断的sync,不断的flushBuffer,但write写数据却被blocked了。这时就需要发送heartbeat给downstream的DataNode也表示该pipeline还是健康的。
参考资料: