当前位置:   article > 正文

2021-12-23 hadoop3 写数据流程(三):DataStreamer_hadoop datastreamer

hadoop datastreamer

1 概述

见名知意,此类主要用于数据传输,是一个守护线程,在创建DFSOutputStream的过程中被启动,启动之后再run方法之中使用一个while死循环(直到流或者客户端关闭)才停止运行,里面主要的逻辑是通过维护一个dataQueue队列,等待主线程往其中添加packet数据,等到添加了packet数据之后,会触发数据的发送,将数据发送到对应pipeline对应的dn之中,从而完成数据的传输。

2 源码分析

2.1 DataStreamer的前提调用

根据前文,在DFSOutputStream#newStreamForCreate中会创建对应的DFSOutputStream:

  1. final DFSOutputStream out;
  2. // 判断是否配置了ErasureCodingPolicy从而创建不同的DFSOutputStream对象
  3. // 过程中会使用通过rpc远程创建INodeFile返回的HDFSFileStatus对象
  4. if(stat.getErasureCodingPolicy() != null) {
  5. out = new DFSStripedOutputStream(dfsClient, src, stat,
  6. flag, progress, checksum, favoredNodes);
  7. } else {
  8. out = new DFSOutputStream(dfsClient, src, stat,
  9. flag, progress, checksum, favoredNodes, true);
  10. }
  11. // 启动往dn pipeline发送packet数据的的DataStreamer
  12. out.start();

这里主要看非ErasureCoding一节,即创建DFSOutputStreamer

  1. /** Construct a new output stream for creating a file. */
  2. protected DFSOutputStream(DFSClient dfsClient, String src,
  3. HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
  4. DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
  5. // 这里主要进行一些初始化的操作
  6. this(dfsClient, src, flag, progress, stat, checksum);
  7. this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
  8. // 参见下文关于此方法的解释,因为涉及到一张图,哈哈
  9. computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
  10. bytesPerChecksum);
  11. // 在这里直接创建DataStreamer
  12. if (createStreamer) {
  13. streamer = new DataStreamer(stat, null, dfsClient, src, progress,
  14. checksum, cachingStrategy, byteArrayManager, favoredNodes,
  15. addBlockFlags);
  16. }
  17. }
  18. private DFSOutputStream(DFSClient dfsClient, String src,
  19. EnumSet<CreateFlag> flag,
  20. Progressable progress, HdfsFileStatus stat, DataChecksum checksum) {
  21. super(getChecksum4Compute(checksum, stat));
  22. this.dfsClient = dfsClient;
  23. this.src = src;
  24. this.fileId = stat.getFileId();
  25. this.blockSize = stat.getBlockSize();
  26. this.blockReplication = stat.getReplication();
  27. this.fileEncryptionInfo = stat.getFileEncryptionInfo();
  28. this.cachingStrategy = new AtomicReference<>(
  29. dfsClient.getDefaultWriteCachingStrategy());
  30. this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class);
  31. if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
  32. this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
  33. }
  34. if (flag.contains(CreateFlag.NO_LOCAL_RACK)) {
  35. this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_RACK);
  36. }
  37. if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
  38. this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
  39. }
  40. if (progress != null) {
  41. DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
  42. +"{}", src);
  43. }
  44. initWritePacketSize();
  45. this.bytesPerChecksum = checksum.getBytesPerChecksum();
  46. if (bytesPerChecksum <= 0) {
  47. throw new HadoopIllegalArgumentException(
  48. "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
  49. }
  50. if (blockSize % bytesPerChecksum != 0) {
  51. throw new HadoopIllegalArgumentException("Invalid values: "
  52. + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY
  53. + " (=" + bytesPerChecksum + ") must divide block size (=" +
  54. blockSize + ").");
  55. }
  56. this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
  57. }

下面解释下computePacketChunkSize

  1. protected void computePacketChunkSize(int psize, int csize) {
  2. // 64Kb - packetHeader长度(33b,如下图)
  3. final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
  4. // getChecksumSize默认是使用crc32,即4b,csize默认是512b,因此chunkSize=516b
  5. final int chunkSize = csize + getChecksumSize();
  6. chunksPerPacket = Math.max(bodySize/chunkSize, 1);
  7. // packet的真实大小
  8. packetSize = chunkSize*chunksPerPacket;
  9. DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, "
  10. + "chunksPerPacket={}, packetSize={}",
  11. src, chunkSize, chunksPerPacket, packetSize);
  12. }

2.2 DataStreamer构造函数

  1. 没啥太多可介绍的,就是参数初始化
  2. /**
  3. * construction with tracing info
  4. */
  5. DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
  6. String src, Progressable progress, DataChecksum checksum,
  7. AtomicReference<CachingStrategy> cachingStrategy,
  8. ByteArrayManager byteArrayManage, String[] favoredNodes,
  9. EnumSet<AddBlockFlag> flags) {
  10. this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
  11. byteArrayManage, false, favoredNodes, flags);
  12. stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
  13. }
  14. private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
  15. DFSClient dfsClient, String src,
  16. Progressable progress, DataChecksum checksum,
  17. AtomicReference<CachingStrategy> cachingStrategy,
  18. ByteArrayManager byteArrayManage,
  19. boolean isAppend, String[] favoredNodes,
  20. EnumSet<AddBlockFlag> flags) {
  21. this.block = new BlockToWrite(block);
  22. this.dfsClient = dfsClient;
  23. this.src = src;
  24. this.progress = progress;
  25. this.stat = stat;
  26. this.checksum4WriteBlock = checksum;
  27. this.cachingStrategy = cachingStrategy;
  28. this.byteArrayManager = byteArrayManage;
  29. this.isLazyPersistFile = isLazyPersist(stat);
  30. this.isAppend = isAppend;
  31. this.favoredNodes = favoredNodes;
  32. final DfsClientConf conf = dfsClient.getConf();
  33. this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
  34. this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
  35. this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
  36. this.addBlockFlags = flags;
  37. }

2.3 run方法

由于这个对象实际上是一个线程,因此在DFSOutputStream#newStreamForCreate方法中最后启动start()方法时,就是调用线程的run方法执行操作(实在有点长。。。。)。

  1. /*
  2. * streamer thread is the only thread that opens streams to datanode,
  3. * and closes them. Any error recovery is also done by this thread.
  4. */
  5. @Override
  6. public void run() {
  7. long lastPacket = Time.monotonicNow();
  8. TraceScope scope = null;
  9. // 死循环知道客户端或者流关闭
  10. while (!streamerClosed && dfsClient.clientRunning) {
  11. // if the Responder encountered an error, shutdown Responder
  12. if (errorState.hasError()) {
  13. closeResponder();
  14. }
  15. DFSPacket one;
  16. try {
  17. // process datanode IO errors if any
  18. boolean doSleep = processDatanodeOrExternalError();
  19. final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
  20. synchronized (dataQueue) {
  21. // wait for a packet to be sent.
  22. long now = Time.monotonicNow();
  23. /**
  24. * shouldRun:是否应该停止,根据流是否关闭、是否发生异常、是否客户端停止运行决定
  25. * dataQueue:最重要的一个变量,== 0表示还未开始写数据
  26. * stage:block的阶段
  27. * 现在距离上一个packet是否过去了指定客户端socket(60s)的一半
  28. * doSleep:数据流突然出现故障
  29. * 如果这些条件满足了,则让dataQueue休眠等待数据写入
  30. */
  31. while ((!shouldStop() && dataQueue.size() == 0 &&
  32. (stage != BlockConstructionStage.DATA_STREAMING ||
  33. now - lastPacket < halfSocketTimeout)) || doSleep) {
  34. long timeout = halfSocketTimeout - (now-lastPacket);
  35. timeout = timeout <= 0 ? 1000 : timeout;
  36. timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
  37. timeout : 1000;
  38. try {
  39. dataQueue.wait(timeout);
  40. } catch (InterruptedException e) {
  41. LOG.debug("Thread interrupted", e);
  42. }
  43. doSleep = false;
  44. now = Time.monotonicNow();
  45. }
  46. if (shouldStop()) {
  47. continue;
  48. }
  49. // get packet to be sent.
  50. // 获取需要发送的packet
  51. // 如果数据队列为空,那么先创建一个心跳packet(此心跳用于告知dn客户端仍存活),否则获取正常的数据packet
  52. if (dataQueue.isEmpty()) {
  53. one = createHeartbeatPacket();
  54. } else {
  55. try {
  56. // 写入管道拥挤(客户端请求过于频繁)时,会进行一定的休眠
  57. backOffIfNecessary();
  58. } catch (InterruptedException e) {
  59. LOG.debug("Thread interrupted", e);
  60. }
  61. one = dataQueue.getFirst(); // regular data packet
  62. SpanId[] parents = one.getTraceParents();
  63. if (parents.length > 0) {
  64. scope = dfsClient.getTracer().
  65. newScope("dataStreamer", parents[0]);
  66. scope.getSpan().setParents(parents);
  67. }
  68. }
  69. }
  70. // get new block from namenode.
  71. LOG.debug("stage={}, {}", stage, this);
  72. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
  73. // 此逻辑用于创建新文件
  74. LOG.debug("Allocating new block: {}", this);
  75. //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
  76. // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
  77. setPipeline(nextBlockOutputStream());
  78. // 初始化数据流,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
  79. // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
  80. // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
  81. initDataStreaming();
  82. } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
  83. // 此逻辑用于往文件添加数据
  84. LOG.debug("Append to block {}", block);
  85. // 这里也是创建一个dataStreamer
  86. setupPipelineForAppendOrRecovery();
  87. if (streamerClosed) {
  88. continue;
  89. }
  90. // 初始化dataStream,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
  91. // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
  92. initDataStreaming();
  93. }
  94. // 获取packet数据在block中的最后偏移量
  95. long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
  96. if (lastByteOffsetInBlock > stat.getBlockSize()) {
  97. throw new IOException("BlockSize " + stat.getBlockSize() +
  98. " < lastByteOffsetInBlock, " + this + ", " + one);
  99. }
  100. // 判断是否是最后一个packet
  101. // 里面会等待所有lastPacket之前的Packet被确认。然后把流水线状态设置为关闭,
  102. // 但是此时还没有把lastPacket写到流水线上。
  103. if (one.isLastPacketInBlock()) {
  104. // wait for all data packets have been successfully acked
  105. synchronized (dataQueue) {
  106. while (!shouldStop() && ackQueue.size() != 0) {
  107. try {
  108. // wait for acks to arrive from datanodes
  109. // 等待从dn返回的ack
  110. dataQueue.wait(1000);
  111. } catch (InterruptedException e) {
  112. LOG.debug("Thread interrupted", e);
  113. }
  114. }
  115. }
  116. if (shouldStop()) {
  117. continue;
  118. }
  119. // 指示pipeline关闭
  120. stage = BlockConstructionStage.PIPELINE_CLOSE;
  121. }
  122. // send the packet
  123. SpanId spanId = SpanId.INVALID;
  124. synchronized (dataQueue) {
  125. // move packet from dataQueue to ackQueue
  126. if (!one.isHeartbeatPacket()) {
  127. if (scope != null) {
  128. spanId = scope.getSpanId();
  129. scope.detach();
  130. one.setTraceScope(scope);
  131. }
  132. scope = null;
  133. // 将此处理的packet移到ack队列中,指示这些packet处于等待被确认的过程中
  134. dataQueue.removeFirst();
  135. ackQueue.addLast(one);
  136. packetSendTime.put(one.getSeqno(), Time.monotonicNow());
  137. dataQueue.notifyAll();
  138. }
  139. }
  140. LOG.debug("{} sending {}", this, one);
  141. // write out data to remote datanode
  142. try (TraceScope ignored = dfsClient.getTracer().
  143. newScope("DataStreamer#writeTo", spanId)) {
  144. // 将packet写入流水线中
  145. one.writeTo(blockStream);
  146. blockStream.flush();
  147. } catch (IOException e) {
  148. // HDFS-3398 treat primary DN is down since client is unable to
  149. // write to primary DN. If a failed or restarting node has already
  150. // been recorded by the responder, the following call will have no
  151. // effect. Pipeline recovery can handle only one node error at a
  152. // time. If the primary node fails again during the recovery, it
  153. // will be taken out then.
  154. // 用于标识当没有明显异常收到时,标记第一个dn为挂起而停止传输
  155. errorState.markFirstNodeIfNotMarked();
  156. throw e;
  157. }
  158. lastPacket = Time.monotonicNow();
  159. // update bytesSent
  160. long tmpBytesSent = one.getLastByteOffsetBlock();
  161. if (bytesSent < tmpBytesSent) {
  162. bytesSent = tmpBytesSent;
  163. }
  164. if (shouldStop()) {
  165. continue;
  166. }
  167. // Is this block full?
  168. // 通知当前block已经写完,从而等待acks
  169. if (one.isLastPacketInBlock()) {
  170. // wait for the close packet has been acked
  171. synchronized (dataQueue) {
  172. while (!shouldStop() && ackQueue.size() != 0) {
  173. dataQueue.wait(1000);// wait for acks to arrive from datanodes
  174. }
  175. }
  176. if (shouldStop()) {
  177. continue;
  178. }
  179. // 当一个块写完之后,需要添加新的块,会在上一个块end掉的时候(调用endBlock),
  180. // 把stage设置成PIPELINE_SETUP_CREATE,这样一来下次流水线也是被建立来创建新的块,达到添加块的目的。
  181. endBlock();
  182. }
  183. if (progress != null) { progress.progress(); }
  184. // This is used by unit test to trigger race conditions.
  185. if (artificialSlowdown != 0 && dfsClient.clientRunning) {
  186. Thread.sleep(artificialSlowdown);
  187. }
  188. } catch (Throwable e) {
  189. // Log warning if there was a real error.
  190. if (!errorState.isRestartingNode()) {
  191. // Since their messages are descriptive enough, do not always
  192. // log a verbose stack-trace WARN for quota exceptions.
  193. if (e instanceof QuotaExceededException) {
  194. LOG.debug("DataStreamer Quota Exception", e);
  195. } else {
  196. LOG.warn("DataStreamer Exception", e);
  197. }
  198. }
  199. lastException.set(e);
  200. assert !(e instanceof NullPointerException);
  201. errorState.setInternalError();
  202. if (!errorState.isNodeMarked()) {
  203. // Not a datanode issue
  204. streamerClosed = true;
  205. }
  206. } finally {
  207. if (scope != null) {
  208. scope.close();
  209. scope = null;
  210. }
  211. }
  212. }
  213. closeInternal();
  214. }

2.4 nextBlockOutputStream

此方法再创建一个新块时被调用:

  1. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
  2. // 此逻辑用于创建新文件
  3. LOG.debug("Allocating new block: {}", this);
  4. //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
  5. // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
  6. setPipeline(nextBlockOutputStream());
  7. // 初始化数据流,在其中会启动一个nextBlockOutputStream线程,此线程用来处理来自dn的响应
  8. // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
  9. // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
  10. initDataStreaming();
  11. }

这个方法返回的是一个LocatedBlock,包含了一个块的信息。包括Block的备份存储位置,块的大小,块的BGS和BlockId。

  1. /**
  2. * Open a DataStreamer to a DataNode so that it can be written to.
  3. * This happens when a file is created and each time a new block is allocated.
  4. * Must get block ID and the IDs of the destinations from the namenode.
  5. * Returns the list of target datanodes.
  6. */
  7. protected LocatedBlock nextBlockOutputStream() throws IOException {
  8. LocatedBlock lb;
  9. DatanodeInfo[] nodes;
  10. StorageType[] nextStorageTypes;
  11. String[] nextStorageIDs;
  12. int count = dfsClient.getConf().getNumBlockWriteRetry();
  13. boolean success;
  14. final ExtendedBlock oldBlock = block.getCurrentBlock();
  15. // 循环创建一个新块,知道成功或者到达block写入的重试次数
  16. do {
  17. // 由于是创建新块,老块的异常就直接清除了
  18. errorState.resetInternalError();
  19. lastException.clear();
  20. // 不想将块副本保存到那些dn节点
  21. DatanodeInfo[] excluded = getExcludedNodes();
  22. // 创建一个新块,rpc调用namenode的addBlock操作
  23. lb = locateFollowingBlock(
  24. excluded.length > 0 ? excluded : null, oldBlock);
  25. // 设置一些基础信息,如当前块、传输数据量、密钥等
  26. block.setCurrentBlock(lb.getBlock());
  27. block.setNumBytes(0);
  28. bytesSent = 0;
  29. accessToken = lb.getBlockToken();
  30. nodes = lb.getLocations();
  31. nextStorageTypes = lb.getStorageTypes();
  32. nextStorageIDs = lb.getStorageIDs();
  33. // Connect to first DataNode in the list.
  34. // 建立和流水线上的第一个dn的连接
  35. // 这里会先建立一个pipeline的socket连接
  36. // 而后调用Sender#writeBlock方法通知那些包含在pipeline中的dn
  37. // 最后接受来自dn的回复,做后续的判断
  38. success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
  39. 0L, false);
  40. if (!success) {
  41. LOG.warn("Abandoning " + block);
  42. dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
  43. stat.getFileId(), src, dfsClient.clientName);
  44. block.setCurrentBlock(null);
  45. final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
  46. LOG.warn("Excluding datanode " + badNode);
  47. excludedNodes.put(badNode, badNode);
  48. }
  49. } while (!success && --count >= 0);
  50. if (!success) {
  51. throw new IOException("Unable to create new block.");
  52. }
  53. return lb;
  54. }

2.5 ResponseProcessor线程

这是一个守护线程,用来处理来自dn的ack。DataNode接收到Packet后需要向客户端回复ACK,表示自己已经收到Packet了,而接收处理ACK的线程类就是ResponseProcessor。

对每一个块的传输都需要新建一个ResponseProcessor,当块传输完,客户端会通过endBlock方法间接地把当前ResponseProcessor销毁掉。下次传输新的Block的时候通过初始化传输环境方法initDataStreaming来间接地创建ResponseProcessor。

启动之后同样主要看run()方法呀:

  1. @Override
  2. public void run() {
  3. setName("ResponseProcessor for block " + block);
  4. // 创建一个代表ack的对象
  5. PipelineAck ack = new PipelineAck();
  6. TraceScope scope = null;
  7. // 循环接受ack,除非线程关闭、客户端停止运行、最后一个packet
  8. while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
  9. // process responses from datanodes.
  10. try {
  11. // read an ack from the pipeline
  12. // 从管道中读取ack
  13. ack.readFields(blockReplyStream);
  14. if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
  15. Long begin = packetSendTime.get(ack.getSeqno());
  16. if (begin != null) {
  17. long duration = Time.monotonicNow() - begin;
  18. if (duration > dfsclientSlowLogThresholdMs) {
  19. LOG.info("Slow ReadProcessor read fields for block " + block
  20. + " took " + duration + "ms (threshold="
  21. + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
  22. + ", targets: " + Arrays.asList(targets));
  23. }
  24. }
  25. }
  26. LOG.debug("DFSClient {}", ack);
  27. // 获取packet序号,在客户端和DataNode的通信中,数据是以Packet为单位进行传输的,每个packet的序号独一无二
  28. // 根据这个序号可以获知此ack对应那个packet
  29. // 序号是从0开始计数的,序号为-1的Packet是心跳包,客户端用他来告诉DataNode客户端还活着。
  30. // 序号为-2的包为未知包,收到这个包需要抛出异常
  31. long seqno = ack.getSeqno();
  32. // processes response status from datanodes.
  33. ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
  34. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
  35. // 从ack的header信息中获取对应的dn的状态
  36. final Status reply = PipelineAck.getStatusFromHeader(ack
  37. .getHeaderFlag(i));
  38. // 根据状态查看dn是否处于繁忙
  39. if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
  40. PipelineAck.ECN.CONGESTED) {
  41. congestedNodesFromAck.add(targets[i]);
  42. }
  43. // Restart will not be treated differently unless it is
  44. // the local node or the only one in the pipeline.
  45. // 根据状态判断是否有dn处于重启过程中
  46. if (PipelineAck.isRestartOOBStatus(reply)) {
  47. final String message = "Datanode " + i + " is restarting: "
  48. + targets[i];
  49. // 根据是否等待,如果等待将会把将当前传进来的节点标记为正在重启的节点
  50. // 并且为他设置重启时限,把BadNode记录清除掉(这时的BadNode一般是流水线上第一个DataNode,
  51. // BadNode指的是工作过程发生错误或者无法联系上的DataNode)
  52. errorState.initRestartingNode(i, message,
  53. shouldWaitForRestart(i));
  54. throw new IOException(message);
  55. }
  56. // node error
  57. // 检查ACK的回应是否是SUCCESS,如果不是,表示对应的DataNode没有
  58. // 正常接收Packet,那么将把该DataNode标记为BadNode。
  59. if (reply != SUCCESS) {
  60. errorState.setBadNodeIndex(i); // mark bad datanode
  61. throw new IOException("Bad response " + reply +
  62. " for " + block + " from datanode " + targets[i]);
  63. }
  64. }
  65. // 将上面得到的繁忙节点加入到DataStreamer的成员变量congestedNodes中,
  66. // 这个变量用来标记所有繁忙节点,以便输出日志(DataStreamer的backIfNecessary)的时候观察哪些节点繁忙。
  67. if (!congestedNodesFromAck.isEmpty()) {
  68. synchronized (congestedNodes) {
  69. congestedNodes.clear();
  70. congestedNodes.addAll(congestedNodesFromAck);
  71. }
  72. } else {
  73. synchronized (congestedNodes) {
  74. congestedNodes.clear();
  75. lastCongestionBackoffTime = 0;
  76. }
  77. }
  78. assert seqno != PipelineAck.UNKOWN_SEQNO :
  79. "Ack for unknown seqno should be a failed ack: " + ack;
  80. if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
  81. continue;
  82. }
  83. // a success ack for a data packet
  84. DFSPacket one;
  85. // ackQueue中存储的都是待确认的packet,如果数据包发出去之后流水线失败,
  86. // 得不到确认。数据包可以从ackQueue恢复,不至于以前的Packet丢失。
  87. synchronized (dataQueue) {
  88. one = ackQueue.getFirst();
  89. }
  90. // 收到的ACK的序号和ackQueue队头元素的序号一不一样,如果不一样,说明可能收发乱序了
  91. if (one.getSeqno() != seqno) {
  92. throw new IOException("ResponseProcessor: Expecting seqno " +
  93. one.getSeqno() + " for block " + block +
  94. " but received " + seqno);
  95. }
  96. isLastPacketInBlock = one.isLastPacketInBlock();
  97. // Fail the packet write for testing in order to force a
  98. // pipeline recovery.
  99. if (DFSClientFaultInjector.get().failPacket() &&
  100. isLastPacketInBlock) {
  101. failPacket = true;
  102. throw new IOException(
  103. "Failing the last packet for testing.");
  104. }
  105. // update bytesAcked
  106. // getLastByteOffsetBlock其实就是最后一个包的结尾相对Block起始位置的偏移量。也就是现在写了的数据量。
  107. // offsetInBlock + dataPos - dataStart
  108. block.setNumBytes(one.getLastByteOffsetBlock());
  109. synchronized (dataQueue) {
  110. scope = one.getTraceScope();
  111. if (scope != null) {
  112. scope.reattach();
  113. one.setTraceScope(null);
  114. }
  115. lastAckedSeqno = seqno;
  116. pipelineRecoveryCount = 0;
  117. // 移除已经被确认的packet
  118. ackQueue.removeFirst();
  119. packetSendTime.remove(seqno);
  120. dataQueue.notifyAll();
  121. one.releaseBuffer(byteArrayManager);
  122. }
  123. } catch (Throwable e) {
  124. if (!responderClosed) {
  125. lastException.set(e);
  126. errorState.setInternalError();
  127. // 标记第一个dn为badNode,因为第一个建立连接,嫌疑最大
  128. errorState.markFirstNodeIfNotMarked();
  129. synchronized (dataQueue) {
  130. dataQueue.notifyAll();
  131. }
  132. if (!errorState.isRestartingNode()) {
  133. LOG.warn("Exception for " + block, e);
  134. }
  135. responderClosed = true;
  136. }
  137. } finally {
  138. if (scope != null) {
  139. scope.close();
  140. }
  141. scope = null;
  142. }
  143. }
  144. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/592798
推荐阅读
相关标签
  

闽ICP备14008679号