当前位置:   article > 正文

HDFS write流程与代码分析(Hadoop 2.0)_writecontent.write2hdfs

writecontent.write2hdfs

1 Reply

Write操作是HDFS最基本的操作之一,一般是先create一个文件然后write内容。这篇文章主要讲用户调用FileSystem APT向HDFS写入数据时,HDFS是怎么工作的。
1,client发起create请求。
通过DistributedFileSystem.create()方法创建文件,其实就是通过DistributedFileSystem内部wrapper的DFSClient向NameNode发送RPC请求。因为所有的元数据相关的FileSystem API都是转嫁到DFSClient上然后通过与NameNode的RPC完成的。
create操作生成了一个DFSOutputStream对象,而构造这个对象时首先是向NameNode发送了create RPC,然后构造并启动了DataStreamer线程,最后开启Lease定时Renew机制LeaseRenewer。
2,NameNode处理create请求,然后给client发送response。
Create RPC到达NameNode之后依次走过NameNodeRpcServer.create()->FSNamesystem.startFile()->startFileInt()->startFileInternal()
3,client向NameNode请求分配Block,并向多个DataNode建立pipeline传输数据。

HDFS文件以一个一个Block的形式写入HDFS,Client首先向NameNode发送addBlock RPC请求,NameNode返回一个LocatedBlock对象包含该Block的元数据信息和对应的三副本所在的DataNode。然后client向pipelien中的第一个DataNode发起TCP连接。


NameNode接到client端发送的addBlock RPC之后主要做了一下几件事:
(1) 确认该client是否持有该文件的lease。
(2) Commit上一个Block,同时把这个Block的元数据持久化到EditLog,Renew lease。
(3) 通过BlockManage分配相应副本数目的DataNode用以存放该块Block。
(4) 给client端返回LocatedBlock。

HDFS的每个Block的construction是由write pipeline完成的。HDFS的pipeline有不同的阶段,通过BlockConstructionStage来表示。开始的时候是BlockConstructionStage.PIPELINE_SETUP_CREATE阶段。数据是以packet为单位被push到pipeline中。在没有出错的情况下Block construction经历了三个阶段(如下图所示):setup,streaming,close。注意到在streaming阶段(也就是t1-t2这段时间内)packet的发送并不是收到上一个packet的ack之后才发送下一个packet,而是以一种滑动窗口的形式发送的。在最后的close阶段就要等到这个Block内所有packet的ack全部收到才能close这个pipeline了。

Write主要是通过DFSOutputStream完成的。DFSOutputStream是write操作的核心类,该类里面有两个内部类Packet和DataStreamer,同时该类继承自FSOutputSummer。

FSOutputSummer这个抽象类包装了与checksum相关操作的方法,所以DFSOutputStream这个输出流在写数据的同时会写checksum。DFSOutputStream接收数据是通过FSOutputSummer的write方法。

数据传输的单位是由内部类Packet定义的,一个Packet大小是64K,是由512bytes的chunk组成的,一个chunk对应一个checksum。而另一个内部类DataStreamer是向外写数据的操作逻辑,是整个write操作的核心。


下面重点介绍DataStreamer类。DataStreamer.run()方法:
数据以默认64M的block为单位传输,也就是说 只支持client端串行写,这个和GFS不一样,GFS支持atomic record append,也就是多个client对同一个文件进行追加写。因为GFS是通过其中的一个ChunkServer(DataNode)来coordinate各个record有序的。HDFS是通过建立pipeline来完成数据传输的。

在client端有两个队列dataQueue和ackQueue。dataQueue是待发送的数据包packet临时存放的地方,ackQueue是已经发送还没有收到ack的数据包临时存放的地方。很多操作是需要对这两个队列上锁(synchronized)的。整个写数据的过程是以Block为单位的,整个过程随着BlockConstructionStage的变化而变化。一个Block写完,重新建立新的pipeline,写入新的Block。

  1. public void run() {
  2. long lastPacket = Time.now();
  3. while (!streamerClosed && dfsClient.clientRunning) {
  4. // if the Responder encountered an error, shutdown Responder
  5. if (hasError && response != null) {
  6. try {
  7. response.close();
  8. response.join();
  9. response = null;
  10. } catch (InterruptedException e) {
  11. DFSClient.LOG.warn("Caught exception ", e);
  12. }
  13. }
  14. Packet one = null;
  15. try {
  16. // process datanode IO errors if any
  17. boolean doSleep = false;
  18. if (hasError && errorIndex>=0) {
  19. doSleep = processDatanodeError();
  20. }
  21. synchronized (dataQueue) {
  22. // wait for a packet to be sent.
  23. long now = Time.now();
  24. while ((!streamerClosed && !hasError && dfsClient.clientRunning
  25. && dataQueue.size() == 0 &&
  26. (stage != BlockConstructionStage.DATA_STREAMING ||
  27. stage == BlockConstructionStage.DATA_STREAMING &&
  28. now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
  29. //dataQueue还没有数据,同时离pipeline中下游节点的读超时还远,那就先休息会吧。
  30. long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
  31. timeout = timeout <= 0 ? 1000 : timeout;
  32. timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
  33. timeout : 1000;
  34. try {
  35. dataQueue.wait(timeout);
  36. } catch (InterruptedException e) {
  37. DFSClient.LOG.warn("Caught exception ", e);
  38. }
  39. doSleep = false;
  40. now = Time.now();
  41. }
  42. //任何时候发现DataStreamer挂了,或者pipeline中出错了,或者这个client被关闭了,那么后面直接跳过,回到循环开始的时候处理错误。
  43. if (streamerClosed || hasError || !dfsClient.clientRunning) {
  44. continue;
  45. }
  46. // get packet to be sent.
  47. if (dataQueue.isEmpty()) {
  48. one = new Packet(checksum.getChecksumSize()); // heartbeat packet
  49. } else {
  50. one = dataQueue.getFirst(); // regular data packet
  51. }
  52. }
  53. assert one != null;
  54. // get new block from namenode.
  55. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
  56. if(DFSClient.LOG.isDebugEnabled()) {
  57. DFSClient.LOG.debug("Allocating new block");
  58. }
  59. //如果是create文件,那么先向NameNode发送addBlock RPC,返回LocatedBlock中包含了三个DataNodeInfo;然后通过createBlockOutputStream()这个函数向第一个DN发送写命令,通过DN传导建立起pipeline。
  60. nodes = nextBlockOutputStream(src);
  61. //生成一个ResponseProcessor负责处理pipeline的response。一个64M的Block对应一个ResponseProcessor,而且所有Block也是串行传输的。然后pipeline进入DATA_STREAMING阶段。
  62. initDataStreaming();
  63. } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
  64. if(DFSClient.LOG.isDebugEnabled()) {
  65. DFSClient.LOG.debug("Append to block " + block);
  66. }
  67. setupPipelineForAppendOrRecovery();
  68. initDataStreaming();
  69. }
  70. //获取current packet的最后一字节在整个Block中的offset
  71. long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
  72. if (lastByteOffsetInBlock > blockSize) {
  73. throw new IOException("BlockSize " + blockSize +
  74. " is smaller than data size. " +
  75. " Offset of packet in block " +
  76. lastByteOffsetInBlock +
  77. " Aborting file " + src);
  78. }
  79. if (one.lastPacketInBlock) {
  80. // wait for all data packets have been successfully acked
  81. synchronized (dataQueue) {
  82. while (!streamerClosed && !hasError &&
  83. ackQueue.size() != 0 && dfsClient.clientRunning) {
  84. //如果ackQueue中还有待确认的packet,则循环等待直到这个Block中所有的packet都被确认了才退出循环。
  85. try {
  86. // wait for acks to arrive from datanodes
  87. dataQueue.wait(1000);
  88. } catch (InterruptedException e) {
  89. DFSClient.LOG.warn("Caught exception ", e);
  90. }
  91. }
  92. }
  93. if (streamerClosed || hasError || !dfsClient.clientRunning) {
  94. continue;
  95. }
  96. //这个Block中所有的packet都被ack了,那么这个BlockConstructionStage就发生变化了。一个Block对应一个pipeline,所以写Block的过程是从PIPELINE_CREATE到PIPELINE_CLOSE
  97. stage = BlockConstructionStage.PIPELINE_CLOSE;
  98. }
  99. // send the packet
  100. synchronized (dataQueue) {
  101. // move packet from dataQueue to ackQueue
  102. if (!one.isHeartbeatPacket()) {
  103. dataQueue.removeFirst();
  104. ackQueue.addLast(one);
  105. dataQueue.notifyAll();
  106. }
  107. }
  108. if (DFSClient.LOG.isDebugEnabled()) {
  109. DFSClient.LOG.debug("DataStreamer block " + block +
  110. " sending packet " + one);
  111. }
  112. //向刚才建立起来的pipeline中写packet
  113. try {
  114. one.writeTo(blockStream);
  115. blockStream.flush();
  116. } catch (IOException e) {
  117. errorIndex = 0;
  118. throw e;
  119. }
  120. lastPacket = Time.now();
  121. if (one.isHeartbeatPacket()) { //heartbeat packet
  122. }
  123. // update bytesSent
  124. long tmpBytesSent = one.getLastByteOffsetBlock();
  125. if (bytesSent < tmpBytesSent) {
  126. bytesSent = tmpBytesSent;
  127. }
  128. if (streamerClosed || hasError || !dfsClient.clientRunning) {
  129. continue;
  130. }
  131. // Is this block full?
  132. if (one.lastPacketInBlock) {
  133. // wait for the close packet has been acked
  134. synchronized (dataQueue) {
  135. while (!streamerClosed && !hasError &&
  136. ackQueue.size() != 0 && dfsClient.clientRunning) {
  137. dataQueue.wait(1000);// wait for acks to arrive from datanodes
  138. }
  139. }
  140. if (streamerClosed || hasError || !dfsClient.clientRunning) {
  141. continue;
  142. }
  143. //关闭ResponseProcessor,关闭pipeline中的输入输出流
  144. endBlock();
  145. }
  146. if (progress != null) { progress.progress(); }
  147. // This is used by unit test to trigger race conditions.
  148. if (artificialSlowdown != 0 && dfsClient.clientRunning) {
  149. Thread.sleep(artificialSlowdown);
  150. }
  151. } catch (Throwable e) {
  152. DFSClient.LOG.warn("DataStreamer Exception", e);
  153. if (e instanceof IOException) {
  154. setLastException((IOException)e);
  155. }
  156. hasError = true;
  157. if (errorIndex == -1) { // not a datanode error
  158. streamerClosed = true;
  159. }
  160. }
  161. }
  162. closeInternal();


4,处在pipeline中的DataNode处理来自upstream的写操作。

在DataNode中,块数据的接收和发送主要是通过DataXceiver,这部分的网络传输没有采用RPC,而是传输的TCP连接。每个DataNode节点都会启动一个DataXceiverServer和多个DataXceiver,DataXceiverServer相当于TCP socket编程中的Listen线程,而DataXceiver相当于handle线程。

在DataXceiver的run()方法中,从pipeline的上游socket中读取操作内容(函数readOp()),然后处理这个操作(processOp())。对于WRITE_BLOCK这种操作,会调用DataXceiver.writeBlock()函数。这个函数做了如下几件大事:

(1)建立起与pipeline下游的TCP socket连接;

(2)给pipeline上游发送ack;

(3)生成BlockReceiver并通过BlockReceiver.receiveBlock()接收来自pipeline上游的数据,把数据发送给下游并存储在本地磁盘上。BlockReceiver内部包含有到downstream DN和本地磁盘的输出流。

这个函数首先生成PacketResponder接收并处理下游返回的response,同时将下游的ack转发给上游;然后通过循环调用receivePacket()这个函数接收数据包,发送到downstream DN并把packet持久化到本地磁盘(生成Block文件和meta文件)。

关于这三个步骤执行的并发性和先后顺序在HDFS-append之后变得异常复杂,以后有机会再单独阐述吧。

当没有数据传输时,pipeline中各datanode通过heartbeat对连接keepalive。在整个pipeline(包括client和多个DataNode)的各台机器中用于互相通信的socket都配置了两个timeout值,分别是confTime和socketTimeout。confTime的值是WRITE_TIMEOUT,而socketTimeout的值是READ_TIMEOUT,即同一个socket的读写timeout是不同的。这样在整个pipeline的timeout值都是一致的了。

  1. intgetDatanodeWriteTimeout(intnumNodes) {
  2. return(dfsClientConf.confTime > 0) ?
  3. (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
  4. }
  5. intgetDatanodeReadTimeout(intnumNodes) {
  6. returndfsClientConf.socketTimeout > 0?
  7. (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
  8. dfsClientConf.socketTimeout) : 0;
  9. }


与write相关还有个重要的问题就是flush/sync问题,而且这个问题随着HDFS对append的支持而越发复杂。DFSOutputStream继承自FSOutputSummer,DFSOutputStream接收数据是通过FSOutputSummer的write方法,这个方法是synchronized。而sync方法的flushBuffer()和enqueuePacket()也是synchronized。也就是说对一个DFSOutputStream线程,如果sync和write同时调用将发生同步等待。常见的情况就是在HBase中写HLog时,每次有数据写入就要sync。(HLog中有个logSyncer,默认配置是每秒钟调用一次sync,不管有没有数据写入)sync发生的频率非常高,sync抢到锁的可能性很大。这样在不断的sync,不断的flushBuffer,但write写数据却被blocked了。这时就需要发送heartbeat给downstream的DataNode也表示该pipeline还是健康的。

参考资料:

http://kazman.shidler.hawaii.edu/ArchDoc.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/592775
推荐阅读
相关标签
  

闽ICP备14008679号