当前位置:   article > 正文

hadoop3 写数据流程(四):写数据1_storeallocatedblock

storeallocatedblock

参考源码hadoop-3.3.0,这部分代码写的质量真的不太行,好些方法几百行。

1 概述

HDFS使用数据流管道方式(数据管道的建立可以参见写数据系列第三篇)来写数据。 DFSClient通过使用写数据流程(一)创建的数据输出流out调用write方法完成数据的传输,传输中将数据切分成多个packet,而后往dataQueue中存储切分之后的packet,在写入完成后调用notifyAll方法重新激活在wait等待的DataStreamer线程,而后在此线程中的run()方法中调用DataStreaamer#nextBlockOutputStream方法,这里会调用Sender.writeBlock()方法(此方法在DataStreamer#createBlockOutputStream)触发一个写数据块请求, 这个请求会传送到数据流管道中的每一个数据节点, 这个写数据块请求通过流式接口到达Datanode之后, Datanode上监听流式接口请求的DataXceiverServer会接收这个请求, 并构造一个DataXceiver对象, 然后在DataXceiver对象上调用DataXceiver.writeBlock()方法响应这个请求。 当前Datanode的DataXceiver.writeBlock()方法会级联向数据流管道中的下一个Datanode发送写数据块请求, 这个流式请求会一直在数据流管道中传递下去, 直到写数据块请求到达数据流管道中的最后一个Datanode。数据流管道中的最后一个数据节点会回复请求确认, 这个确认消息逆向地通过数据流管道送回DFSClient。

当一个数据块中的所有数据包都成功发送完毕, 并且收到确认消息后, DFSClient会发送一个空数据包标识当前数据块发送完毕。 至此, 整个数据块发送流程结束。

2 源码解析

2.1 往dataQueue队列中写数据

DFSClient收到请求确认后, 将要写入的数据块切分成若干个数据包(packet) , 然后依次向数据流管道中发送这些数据包。 数据包会首先从DFSClient发送到数据流管道中的第一个数据节点(这里是dn1), dn1成功接收数据包后, 会将数据包写入磁盘, 然后将数据包发送到数据流管道中的第二个节点(dn2) 。 依此类推, 当数据包到达数据流管道中的最后一个节点(dn3)时, dn3会对收到的数据包进行校验, 如果校验成功, dn3会发送数据包确认消息, 这个确认消息会逆向地通过数据流管道送回DFSClient。

先从DFSOutputStream#write方法开始吧:

  1. public class DataWriteTest {
  2. public static void main(String[] args) throws IOException {
  3. Configuration conf = new Configuration();
  4. FileSystem fileSystem = FileSystem.newInstance(conf);
  5. // 上传文件
  6. FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/user/clare/test.txt"));
  7. fsDataOutputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8));
  8. }
  9. }

这个write方法经过多层封装调用,最后调用的是DFSOutputStream的父类FSOutputSummer中的write方法:

  1. /** Write one byte */
  2. @Override
  3. public synchronized void write(int b) throws IOException {
  4. buf[count++] = (byte)b;
  5. if(count == buf.length) {
  6. // 强制对任何缓冲的输出字节进行校验 和 写出到基础输出流
  7. flushBuffer();
  8. }
  9. }
  10. /* Forces buffered output bytes to be checksummed and written out to
  11. * the underlying output stream. If there is a trailing partial chunk in the
  12. * buffer,
  13. * 1) flushPartial tells us whether to flush that chunk
  14. * 2) if flushPartial is true, keep tells us whether to keep that chunk in the
  15. * buffer (if flushPartial is false, it is always kept in the buffer)
  16. *
  17. * Returns the number of bytes that were flushed but are still left in the
  18. * buffer (can only be non-zero if keep is true).
  19. * 如果设置为在buf中保留chunk(keep 为true),则返回值为非0
  20. */
  21. protected synchronized int flushBuffer(boolean keep,
  22. boolean flushPartial) throws IOException {
  23. int bufLen = count;
  24. int partialLen = bufLen % sum.getBytesPerChecksum();
  25. int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
  26. if (lenToFlush != 0) {
  27. // 为给定的数据块生成校验和,并将数据块和校验和输出到底层输出流。
  28. writeChecksumChunks(buf, 0, lenToFlush);
  29. if (!flushPartial || keep) {
  30. count = partialLen;
  31. System.arraycopy(buf, bufLen - count, buf, 0, count);
  32. } else {
  33. count = 0;
  34. }
  35. }
  36. // total bytes left minus unflushed bytes left
  37. return count - (bufLen - lenToFlush);
  38. }
  39. /** Generate checksums for the given data chunks and output chunks & checksums
  40. * to the underlying output stream.
  41. */
  42. private void writeChecksumChunks(byte b[], int off, int len)
  43. throws IOException {
  44. // 计算校验和
  45. sum.calculateChunkedSums(b, off, len, checksum, 0);
  46. TraceScope scope = createWriteTraceScope();
  47. try {
  48. // 虽然这个命名是getBytesPerChecksum,但我看了下貌似这个是指chunk,命名不规范
  49. // 因此这里通过for循环一个个chunk写
  50. for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
  51. int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
  52. int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
  53. // 写出chunk
  54. writeChunk(b, off + i, chunkLen, checksum, ckOffset,
  55. getChecksumSize());
  56. }
  57. } finally {
  58. if (scope != null) {
  59. scope.close();
  60. }
  61. }
  62. }

而后调用DFSOutputStream#writeChunk

  1. @Override
  2. protected synchronized void writeChunk(byte[] b, int offset, int len,
  3. byte[] checksum, int ckoff, int cklen) throws IOException {
  4. // 做一些校验,如
  5. writeChunkPrepare(len, ckoff, cklen);
  6. currentPacket.writeChecksum(checksum, ckoff, cklen);
  7. currentPacket.writeData(b, offset, len);
  8. currentPacket.incNumChunks();
  9. getStreamer().incBytesCurBlock(len);
  10. // If packet is full, enqueue it for transmission
  11. if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
  12. getStreamer().getBytesCurBlock() == blockSize) {
  13. // 如果packet满了,则将其写入dataQueue便于后续传输
  14. enqueueCurrentPacketFull();
  15. }
  16. }
  17. synchronized void enqueueCurrentPacketFull() throws IOException {
  18. LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
  19. + " appendChunk={}, {}", currentPacket, src, getStreamer()
  20. .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
  21. getStreamer());
  22. // 此方法调用DataStreamer#waitAndQueuePacket方法等待dataQueue的空间写入packet
  23. enqueueCurrentPacket();
  24. adjustChunkBoundary();
  25. endBlock();
  26. }
  27. void enqueueCurrentPacket() throws IOException {
  28. getStreamer().waitAndQueuePacket(currentPacket);
  29. currentPacket = null;
  30. }

调用waitAndQueuePacket方法将packet添加到队列中,其中当dataQueue满了的时候,会进行等待,但在finally中最终还是会将packet添加到队列中:

  1. /**
  2. * wait for space of dataQueue and queue the packet
  3. *
  4. * @param packet the DFSPacket to be queued
  5. * @throws IOException
  6. */
  7. void waitAndQueuePacket(DFSPacket packet) throws IOException {
  8. synchronized (dataQueue) {
  9. try {
  10. // If queue is full, then wait till we have enough space
  11. boolean firstWait = true;
  12. try {
  13. while (!streamerClosed && dataQueue.size() + ackQueue.size() >
  14. dfsClient.getConf().getWriteMaxPackets()) {
  15. if (firstWait) {
  16. Span span = Tracer.getCurrentSpan();
  17. if (span != null) {
  18. span.addTimelineAnnotation("dataQueue.wait");
  19. }
  20. firstWait = false;
  21. }
  22. try {
  23. dataQueue.wait();
  24. } catch (InterruptedException e) {
  25. // If we get interrupted while waiting to queue data, we still need to get rid
  26. // of the current packet. This is because we have an invariant that if
  27. // currentPacket gets full, it will get queued before the next writeChunk.
  28. //
  29. // Rather than wait around for space in the queue, we should instead try to
  30. // return to the caller as soon as possible, even though we slightly overrun
  31. // the MAX_PACKETS length.
  32. Thread.currentThread().interrupt();
  33. break;
  34. }
  35. }
  36. } finally {
  37. Span span = Tracer.getCurrentSpan();
  38. if ((span != null) && (!firstWait)) {
  39. span.addTimelineAnnotation("end.wait");
  40. }
  41. }
  42. checkClosed();
  43. queuePacket(packet);
  44. } catch (ClosedChannelException cce) {
  45. LOG.debug("Closed channel exception", cce);
  46. }
  47. }
  48. }

2.2 writeBlock:数据块请求

数据块的写请求主要在DataStreamer#run()中触发

  1. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
  2. // 此逻辑用于创建新文件
  3. LOG.debug("Allocating new block: {}", this);
  4. //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
  5. // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
  6. setPipeline(nextBlockOutputStream());
  7. // 初始化数据流,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
  8. // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
  9. // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
  10. initDataStreaming();
  11. } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
  12. // 此逻辑用于往文件添加数据
  13. LOG.debug("Append to block {}", block);
  14. // 这里也是创建一个dataStreamer
  15. setupPipelineForAppendOrRecovery();
  16. if (streamerClosed) {
  17. continue;
  18. }
  19. // 初始化dataStream,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
  20. // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
  21. initDataStreaming();
  22. }

在DataStreamer#nextBlockOutputStream方法中调用DataStreamer#locateFollowingBlock方法向nn请求创建一个block,而后触发写数据块请求:

  1. // 在DataStreamer#nextBlockOutputStream方法中
  2. lb = locateFollowingBlock(
  3. excluded.length > 0 ? excluded : null, oldBlock);
  4. private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
  5. ExtendedBlock oldBlock) throws IOException {
  6. // 这个方法就是在配置各种参数,而后dfsClient通过rpc调用namenodeRpcServer#addBlock
  7. return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
  8. stat.getFileId(), favoredNodes, addBlockFlags);
  9. }
  10. // NanenodeRpcServer#addBlock
  11. // 而这个方法就是调用FSNamesystem中的方法去后续的添加block
  12. @Override
  13. public LocatedBlock addBlock(String src, String clientName,
  14. ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
  15. String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
  16. throws IOException {
  17. checkNNStartup();
  18. LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
  19. clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
  20. if (locatedBlock != null) {
  21. metrics.incrAddBlockOps();
  22. }
  23. return locatedBlock;
  24. }

下面看下FSNamesystem#getAdditionalBlock方法,此方法为指定的文件名(正在写入)获取附加块。返回一个由块和一组机器组成的数组。此列表中的第一个应该是客户端写入数据的位置。列表中的后续项目必须在与第一个数据节点的连接中提供。确保之前的块已被 datanodes 报告并被复制。其实就是一个pipeline中的机器和block位置。

在这个类中,分为两部分:

  1. 首先调用validateAddBlock()方法检查Namenode是否有写权限, 是否处于安全模式中, 以及当前文件系统中的对象是否过多。 然后对当前Namenode中记录的该文件的最后一个数据块与Client上报的最后一个数据块进行比较, 最后判断是否发生了RPC请求的重传, 或者请求异常等情况
  2. 调用FSDirWriteFileOp.storeAllocatedBlock创建一个新的block,过程中会再次调用analyzeFileState方法对文件状态进行分析,如果过程中状态未变化,则进行后续的block的申请,申请之前会先将当前file的最后一个block commit,最后才是申请新的block并添加到文件中
  1. /**
  2. * The client would like to obtain an additional block for the indicated
  3. * filename (which is being written-to). Return an array that consists
  4. * of the block, plus a set of machines. The first on this list should
  5. * be where the client writes data. Subsequent items in the list must
  6. * be provided in the connection to the first datanode.
  7. *
  8. * Make sure the previous blocks have been reported by datanodes and
  9. * are replicated. Will return an empty 2-elt array if we want the
  10. * client to "try again later".
  11. */
  12. LocatedBlock getAdditionalBlock(
  13. String src, long fileId, String clientName, ExtendedBlock previous,
  14. DatanodeInfo[] excludedNodes, String[] favoredNodes,
  15. EnumSet<AddBlockFlag> flags) throws IOException {
  16. final String operationName = "getAdditionalBlock";
  17. NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
  18. " for {}", src, fileId, clientName);
  19. LocatedBlock[] onRetryBlock = new LocatedBlock[1];
  20. FSDirWriteFileOp.ValidateAddBlockResult r;
  21. checkOperation(OperationCategory.READ);
  22. final FSPermissionChecker pc = getPermissionChecker();
  23. FSPermissionChecker.setOperationType(operationName);
  24. readLock();
  25. try {
  26. checkOperation(OperationCategory.READ);
  27. // 分析处于读取锁定状态的文件的状态,以确定客户端是否可以添加新块、检测潜在的重试、
  28. // 租用不匹配以及倒数第二个块的最小复制。为新块生成目标 DataNode 位置,但不要创建新块。
  29. r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
  30. previous, onRetryBlock);
  31. } finally {
  32. readUnlock(operationName);
  33. }
  34. if (r == null) {
  35. assert onRetryBlock[0] != null : "Retry block is null";
  36. // This is a retry. Just return the last block.
  37. return onRetryBlock[0];
  38. }
  39. // 选择存储这个新申请的block的存储
  40. DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
  41. blockManager, src, excludedNodes, favoredNodes, flags, r);
  42. checkOperation(OperationCategory.WRITE);
  43. writeLock();
  44. LocatedBlock lb;
  45. try {
  46. checkOperation(OperationCategory.WRITE);
  47. // 申请新block,过程还会将block添加到对应的文件中
  48. lb = FSDirWriteFileOp.storeAllocatedBlock(
  49. this, src, fileId, clientName, previous, targets);
  50. } finally {
  51. writeUnlock(operationName);
  52. }
  53. getEditLog().logSync();
  54. return lb;
  55. }

因此下面根据上文中提及的两个步骤分别看:

2.2.1 第一部分:权限检查

在这个方法中主要通过FSDirWriteFileOp.validateAddBlock去进行校验,分析处于读取锁定状态的文件的状态,以确定客户端是否可以添加新块、检测潜在的重试、租用不匹配以及倒数第二个块的最小复制。为新块生成目标 DataNode 位置,但不要创建新块。

  1. /**
  2. * Part I of getAdditionalBlock().
  3. * Analyze the state of the file under read lock to determine if the client
  4. * can add a new block, detect potential retries, lease mismatches,
  5. * and minimal replication of the penultimate block.
  6. *
  7. * Generate target DataNode locations for the new block,
  8. * but do not create the new block yet.
  9. */
  10. static ValidateAddBlockResult validateAddBlock(
  11. FSNamesystem fsn, FSPermissionChecker pc,
  12. String src, long fileId, String clientName,
  13. ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
  14. final long blockSize;
  15. final short numTargets;
  16. final byte storagePolicyID;
  17. String clientMachine;
  18. final BlockType blockType;
  19. // 解析文件路径
  20. INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
  21. // 分析状态,对当前Namenode中记录的该文件的最后一个数据块与Client上报的最后一个数据块进行比较,
  22. // 最后判断是否发生了RPC请求的重传, 或者请求异常等情况
  23. FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
  24. previous, onRetryBlock);
  25. if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
  26. // This is a retry. No need to generate new locations.
  27. // Use the last block if it has locations.
  28. return null;
  29. }
  30. // 根据文件状态对象获取内存中的文件对象
  31. final INodeFile pendingFile = fileState.inode;
  32. if (!fsn.checkFileProgress(src, pendingFile, false)) {
  33. throw new NotReplicatedYetException("Not replicated yet: " + src);
  34. }
  35. if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
  36. throw new IOException("File has reached the limit on maximum number of"
  37. + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
  38. + "): " + pendingFile.getBlocks().length + " >= "
  39. + fsn.maxBlocksPerFile);
  40. }
  41. blockSize = pendingFile.getPreferredBlockSize();
  42. clientMachine = pendingFile.getFileUnderConstructionFeature()
  43. .getClientMachine();
  44. blockType = pendingFile.getBlockType();
  45. ErasureCodingPolicy ecPolicy = null;
  46. if (blockType == BlockType.STRIPED) {
  47. ecPolicy =
  48. FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);
  49. numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
  50. + ecPolicy.getSchema().getNumParityUnits());
  51. } else {
  52. numTargets = pendingFile.getFileReplication();
  53. }
  54. storagePolicyID = pendingFile.getStoragePolicyID();
  55. // 从INodeFile对象中获取的变量填充成一个ValidateAddBlockResult对象
  56. return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
  57. clientMachine, blockType, ecPolicy);
  58. }

2.2.2 第二部分:block的申请

调用FSDirWriteFileOp.storeAllocatedBlock创建一个新的block,过程中会再次调用analyzeFileState方法对文件状态进行分析,如果过程中状态未变化,则进行后续的block的申请,申请之前会先将当前file的最后一个block commit,最后才是申请新的block并添加到文件中

  1. /**
  2. * Part II of getAdditionalBlock().
  3. * Should repeat the same analysis of the file state as in Part 1,
  4. * but under the write lock.
  5. * If the conditions still hold, then allocate a new block with
  6. * the new targets, add it to the INode and to the BlocksMap.
  7. */
  8. static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
  9. long fileId, String clientName, ExtendedBlock previous,
  10. DatanodeStorageInfo[] targets) throws IOException {
  11. long offset;
  12. // Run the full analysis again, since things could have changed
  13. // while chooseTarget() was executing.
  14. LocatedBlock[] onRetryBlock = new LocatedBlock[1];
  15. INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
  16. // 再次调用analyzeFileState()方法以防止chooseTarget4NewBlock()
  17. // 方法执行期间Namenode状态发生改变, 例如在调用FSDirWriteFileOp.chooseTarget4NewBlock()
  18. // 方法时, 上一次触发的分配请求执行完成, 这时无须再次分配新的数据块, 返回上一次分配的数据块即可。
  19. FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
  20. previous, onRetryBlock);
  21. final INodeFile pendingFile = fileState.inode;
  22. src = fileState.path;
  23. if (onRetryBlock[0] != null) {
  24. if (onRetryBlock[0].getLocations().length > 0) {
  25. // This is a retry. Just return the last block if having locations.
  26. return onRetryBlock[0];
  27. } else {
  28. // add new chosen targets to already allocated block and return
  29. BlockInfo lastBlockInFile = pendingFile.getLastBlock();
  30. lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
  31. lastBlockInFile, targets, pendingFile.getBlockType());
  32. offset = pendingFile.computeFileSize();
  33. return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
  34. }
  35. }
  36. // commit the last block and complete it if it has minimum replicas
  37. // 提交上一个block,提交时判断是否副本数等要求已满足
  38. fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
  39. ExtendedBlock.getLocalBlock(previous));
  40. // allocate new block, record block locations in INode.
  41. final BlockType blockType = pendingFile.getBlockType();
  42. // allocate new block, record block locations in INode.
  43. // 申请block
  44. Block newBlock = fsn.createNewBlock(blockType);
  45. INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
  46. // 将新建的block添加到对应的文件上
  47. saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
  48. // 往editLog中添加对应的元数据(add block
  49. persistNewBlock(fsn, src, pendingFile);
  50. offset = pendingFile.computeFileSize();
  51. // Return located block
  52. // 构建一个LocatedBlock对象
  53. return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
  54. }

2.3 建立数据管道

在上文申请block完成之后,代码依据createBlockOutputStream方法去创建管道,方法中主要完成三件事:

  1. 与流水线第一个dn建立socket连接;
  2. 发送写数据的请求;
  1. 接收响应信息
  1. // connects to the first datanode in the pipeline
  2. // Returns true if success, otherwise return failure.
  3. //
  4. boolean createBlockOutputStream(DatanodeInfo[] nodes,
  5. StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
  6. long newGS, boolean recoveryFlag) {
  7. if (nodes.length == 0) {
  8. LOG.info("nodes are empty for write pipeline of " + block);
  9. return false;
  10. }
  11. String firstBadLink = "";
  12. boolean checkRestart = false;
  13. if (LOG.isDebugEnabled()) {
  14. LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
  15. }
  16. // persist blocks on namenode on next flush
  17. persistBlocks.set(true);
  18. int refetchEncryptionKey = 1;
  19. while (true) {
  20. boolean result = false;
  21. DataOutputStream out = null;
  22. try {
  23. assert null == s : "Previous socket unclosed";
  24. assert null == blockReplyStream : "Previous blockReplyStream unclosed";
  25. // 步骤1,建立socket连接,返回第一个dn建立连接的socket
  26. s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
  27. long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
  28. long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
  29. OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
  30. InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
  31. IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
  32. unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
  33. unbufOut = saslStreams.out;
  34. unbufIn = saslStreams.in;
  35. // 创建输出流对象,使用装饰者模式,真实使用BufferedOutputStream
  36. out = new DataOutputStream(new BufferedOutputStream(unbufOut,
  37. DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
  38. blockReplyStream = new DataInputStream(unbufIn);
  39. //
  40. // Xmit header info to datanode
  41. //
  42. BlockConstructionStage bcs = recoveryFlag ?
  43. stage.getRecoveryStage() : stage;
  44. // We cannot change the block length in 'block' as it counts the number
  45. // of bytes ack'ed.
  46. ExtendedBlock blockCopy = block.getCurrentBlock();
  47. blockCopy.setNumBytes(stat.getBlockSize());
  48. boolean[] targetPinnings = getPinnings(nodes);
  49. // send the request
  50. // 步骤二:发送写数据请求,这个方法将buffer中的数据刷写到dn中
  51. new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
  52. dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
  53. nodes.length, block.getNumBytes(), bytesSent, newGS,
  54. checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
  55. (targetPinnings != null && targetPinnings[0]), targetPinnings,
  56. nodeStorageIDs[0], nodeStorageIDs);
  57. // receive ack for connect
  58. // 步骤3,接收响应
  59. BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
  60. PBHelperClient.vintPrefixed(blockReplyStream));
  61. Status pipelineStatus = resp.getStatus();
  62. firstBadLink = resp.getFirstBadLink();
  63. // Got an restart OOB ack.
  64. // If a node is already restarting, this status is not likely from
  65. // the same node. If it is from a different node, it is not
  66. // from the local datanode. Thus it is safe to treat this as a
  67. // regular node error.
  68. // 检查通道中dn是否处于重启状态,如果客户端对应这个dn不是重启状态才进入此判断
  69. if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
  70. !errorState.isRestartingNode()) {
  71. checkRestart = true;
  72. throw new IOException("A datanode is restarting.");
  73. }
  74. String logInfo = "ack with firstBadLink as " + firstBadLink;
  75. DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
  76. assert null == blockStream : "Previous blockStream unclosed";
  77. blockStream = out;
  78. result = true; // success
  79. errorState.resetInternalError();
  80. lastException.clear();
  81. // remove all restarting nodes from failed nodes list
  82. failed.removeAll(restartingNodes);
  83. restartingNodes.clear();
  84. } catch (IOException ie) {
  85. if (!errorState.isRestartingNode()) {
  86. LOG.warn("Exception in createBlockOutputStream " + this, ie);
  87. }
  88. if (ie instanceof InvalidEncryptionKeyException &&
  89. refetchEncryptionKey > 0) {
  90. LOG.info("Will fetch a new encryption key and retry, "
  91. + "encryption key was invalid when connecting to "
  92. + nodes[0] + " : " + ie);
  93. // The encryption key used is invalid.
  94. refetchEncryptionKey--;
  95. dfsClient.clearDataEncryptionKey();
  96. // Don't close the socket/exclude this node just yet. Try again with
  97. // a new encryption key.
  98. continue;
  99. }
  100. // find the datanode that matches
  101. // 设置BadNode,把第一个不能正常连接的DataNode标记为BadNode,
  102. // 如果返回的消息不能认定谁是BadNode,那么BadNode就是你啦,第一个节点
  103. if (firstBadLink.length() != 0) {
  104. for (int i = 0; i < nodes.length; i++) {
  105. // NB: Unconditionally using the xfer addr w/o hostname
  106. if (firstBadLink.equals(nodes[i].getXferAddr())) {
  107. errorState.setBadNodeIndex(i);
  108. break;
  109. }
  110. }
  111. } else {
  112. assert !checkRestart;
  113. errorState.setBadNodeIndex(0);
  114. }
  115. final int i = errorState.getBadNodeIndex();
  116. // Check whether there is a restart worth waiting for.
  117. if (checkRestart) {
  118. errorState.initRestartingNode(i,
  119. "Datanode " + i + " is restarting: " + nodes[i],
  120. shouldWaitForRestart(i));
  121. }
  122. errorState.setInternalError();
  123. lastException.set(ie);
  124. result = false; // error
  125. } finally {
  126. if (!result) {
  127. IOUtils.closeSocket(s);
  128. s = null;
  129. IOUtils.closeStream(out);
  130. IOUtils.closeStream(blockReplyStream);
  131. blockReplyStream = null;
  132. }
  133. }
  134. return result;
  135. }
  136. }

2.4 dn进行数据接受

写数据块请求通过流式接口到达Datanode之后, Datanode上监听流式接口请求的DataXceiverServer会接收这个请求, 并构造一个DataXceiver对象, 然后在DataXceiver对象上调用DataXceiver.writeBlock()方法响应这个请求。 当前Datanode的DataXceiver.writeBlock()方法会级联向数据流管道中的下一个Datanode发送写数据块请求, 这个流式请求会一直在数据流管道中传递下去, 直到写数据块请求到达数据流管道中的最后一个Datanode。

上文中Sender(out).writeBlock方法触发写数据请求后,dn的DataXceiverServer接收,接受到后交由DataXceiver进行处理,这两个都是线程对象,所以这里可以先看DataXceiverServer#run()方法,在run方法中创建了DataXceiver线程供后续操作,这里过程也是在DataXceiver#run()方法中实现,经过调用,后面会到DataXceiver#writeBlock方法(这个方法能看下来挺需要耐心。。。。太长了):

  1. @Override
  2. public void writeBlock(final ExtendedBlock block,
  3. final StorageType storageType,
  4. final Token<BlockTokenIdentifier> blockToken,
  5. final String clientname,
  6. final DatanodeInfo[] targets,
  7. final StorageType[] targetStorageTypes,
  8. final DatanodeInfo srcDataNode,
  9. final BlockConstructionStage stage,
  10. final int pipelineSize,
  11. final long minBytesRcvd,
  12. final long maxBytesRcvd,
  13. final long latestGenerationStamp,
  14. DataChecksum requestedChecksum,
  15. CachingStrategy cachingStrategy,
  16. boolean allowLazyPersist,
  17. final boolean pinning,
  18. final boolean[] targetPinnings,
  19. final String storageId,
  20. final String[] targetStorageIds) throws IOException {
  21. previousOpClientName = clientname;
  22. updateCurrentThreadName("Receiving block " + block);
  23. // isDatanode变量指示当前写操作是否是DFSClient发起的 : false
  24. final boolean isDatanode = clientname.length() == 0;
  25. // isClient变量与isDatanode相反, 表示是Datanode触发的写操作 true
  26. final boolean isClient = !isDatanode;
  27. // isTransfer变量指示当前的写操作是否为数据块复制操作, 利用数据流管道状态来判断 false
  28. final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
  29. || stage == BlockConstructionStage.TRANSFER_FINALIZED;
  30. // 是否开启lazyPersist false
  31. allowLazyPersist = allowLazyPersist &&
  32. (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
  33. long size = 0;
  34. // reply to upstream datanode or client
  35. final DataOutputStream replyOut = getBufferedOutputStream();
  36. int nst = targetStorageTypes.length;
  37. StorageType[] storageTypes = new StorageType[nst + 1];
  38. storageTypes[0] = storageType;
  39. if (targetStorageTypes.length > 0) {
  40. System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
  41. }
  42. // To support older clients, we don't pass in empty storageIds
  43. final int nsi = targetStorageIds.length;
  44. final String[] storageIds;
  45. if (nsi > 0) {
  46. storageIds = new String[nsi + 1];
  47. storageIds[0] = storageId;
  48. if (targetStorageTypes.length > 0) {
  49. System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
  50. }
  51. } else {
  52. storageIds = new String[0];
  53. }
  54. checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
  55. BlockTokenIdentifier.AccessMode.WRITE,
  56. storageTypes, storageIds);
  57. // check single target for transfer-RBW/Finalized
  58. if (isTransfer && targets.length > 0) {
  59. throw new IOException(stage + " does not support multiple targets "
  60. + Arrays.asList(targets));
  61. }
  62. if (LOG.isDebugEnabled()) {
  63. LOG.debug("opWriteBlock: stage={}, clientname={}\n " +
  64. "block ={}, newGs={}, bytesRcvd=[{}, {}]\n " +
  65. "targets={}; pipelineSize={}, srcDataNode={}, pinning={}",
  66. stage, clientname, block, latestGenerationStamp, minBytesRcvd,
  67. maxBytesRcvd, Arrays.asList(targets), pipelineSize, srcDataNode,
  68. pinning);
  69. LOG.debug("isDatanode={}, isClient={}, isTransfer={}",
  70. isDatanode, isClient, isTransfer);
  71. LOG.debug("writeBlock receive buf size {} tcp no delay {}",
  72. peer.getReceiveBufferSize(), peer.getTcpNoDelay());
  73. }
  74. // We later mutate block's generation stamp and length, but we need to
  75. // forward the original version of the block to downstream mirrors, so
  76. // make a copy here.
  77. final ExtendedBlock originalBlock = new ExtendedBlock(block);
  78. if (block.getNumBytes() == 0) {
  79. block.setNumBytes(dataXceiverServer.estimateBlockSize);
  80. }
  81. LOG.info("Receiving {} src: {} dest: {}",
  82. block, remoteAddress, localAddress);
  83. // 到下游数据节点的输出流
  84. DataOutputStream mirrorOut = null; // stream to next target
  85. //下游数据节点的输入流
  86. DataInputStream mirrorIn = null; // reply from next target
  87. //到下游节点的Socket
  88. Socket mirrorSock = null; // socket to next target
  89. //下游节点的地址,即名称:端口
  90. String mirrorNode = null; // the name:port of next target
  91. String firstBadLink = ""; // first datanode that failed in connection setup
  92. Status mirrorInStatus = SUCCESS;
  93. final String storageUuid;
  94. final boolean isOnTransientStorage;
  95. try {
  96. final Replica replica;
  97. if (isDatanode ||
  98. stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
  99. // open a block receiver
  100. //打开一个BlockReceiver, 用于从上游节点接收数据块
  101. setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
  102. peer.getRemoteAddressString(),
  103. peer.getLocalAddressString(),
  104. stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
  105. clientname, srcDataNode, datanode, requestedChecksum,
  106. cachingStrategy, allowLazyPersist, pinning, storageId));
  107. replica = blockReceiver.getReplica();
  108. } else {
  109. replica = datanode.data.recoverClose(
  110. block, latestGenerationStamp, minBytesRcvd);
  111. }
  112. storageUuid = replica.getStorageUuid();
  113. isOnTransientStorage = replica.isOnTransientStorage();
  114. //
  115. // Connect to downstream machine, if appropriate
  116. // 连接到下游节点
  117. //
  118. if (targets.length > 0) {
  119. InetSocketAddress mirrorTarget = null;
  120. // Connect to backup machine
  121. mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
  122. LOG.debug("Connecting to datanode {}", mirrorNode);
  123. mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
  124. mirrorSock = datanode.newSocket();
  125. try {
  126. DataNodeFaultInjector.get().failMirrorConnection();
  127. int timeoutValue = dnConf.socketTimeout +
  128. (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
  129. int writeTimeout = dnConf.socketWriteTimeout +
  130. (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
  131. // 建立到下游节点的Socket连接
  132. NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
  133. mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
  134. mirrorSock.setSoTimeout(timeoutValue);
  135. mirrorSock.setKeepAlive(true);
  136. if (dnConf.getTransferSocketSendBufferSize() > 0) {
  137. mirrorSock.setSendBufferSize(
  138. dnConf.getTransferSocketSendBufferSize());
  139. }
  140. OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
  141. writeTimeout);
  142. InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
  143. DataEncryptionKeyFactory keyFactory =
  144. datanode.getDataEncryptionKeyFactoryForBlock(block);
  145. SecretKey secretKey = null;
  146. if (dnConf.overwriteDownstreamDerivedQOP) {
  147. String bpid = block.getBlockPoolId();
  148. BlockKey blockKey = datanode.blockPoolTokenSecretManager
  149. .get(bpid).getCurrentKey();
  150. secretKey = blockKey.getKey();
  151. }
  152. IOStreamPair saslStreams = datanode.saslClient.socketSend(
  153. mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
  154. blockToken, targets[0], secretKey);
  155. unbufMirrorOut = saslStreams.out;
  156. unbufMirrorIn = saslStreams.in;
  157. // 创建下游节点的输入输出流对象
  158. mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
  159. smallBufferSize));
  160. mirrorIn = new DataInputStream(unbufMirrorIn);
  161. String targetStorageId = null;
  162. if (targetStorageIds.length > 0) {
  163. // Older clients may not have provided any targetStorageIds
  164. targetStorageId = targetStorageIds[0];
  165. }
  166. if (targetPinnings != null && targetPinnings.length > 0) {
  167. // 向下游节点发送数据块写入请求,这里发送请求后,经过BlockReceiver接受
  168. new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
  169. blockToken, clientname, targets, targetStorageTypes,
  170. srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
  171. latestGenerationStamp, requestedChecksum, cachingStrategy,
  172. allowLazyPersist, targetPinnings[0], targetPinnings,
  173. targetStorageId, targetStorageIds);
  174. } else {
  175. //接收来自下游节点的请求确认, 并记录请求确认状态
  176. new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
  177. blockToken, clientname, targets, targetStorageTypes,
  178. srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
  179. latestGenerationStamp, requestedChecksum, cachingStrategy,
  180. allowLazyPersist, false, targetPinnings,
  181. targetStorageId, targetStorageIds);
  182. }
  183. mirrorOut.flush();
  184. DataNodeFaultInjector.get().writeBlockAfterFlush();
  185. // read connect ack (only for clients, not for replication req)
  186. if (isClient) {
  187. BlockOpResponseProto connectAck =
  188. BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
  189. mirrorInStatus = connectAck.getStatus();
  190. firstBadLink = connectAck.getFirstBadLink();
  191. if (mirrorInStatus != SUCCESS) {
  192. LOG.debug("Datanode {} got response for connect" +
  193. "ack from downstream datanode with firstbadlink as {}",
  194. targets.length, firstBadLink);
  195. }
  196. }
  197. } catch (IOException e) {
  198. // 出现异常, 向上游节点发送异常响应
  199. if (isClient) {
  200. BlockOpResponseProto.newBuilder()
  201. .setStatus(ERROR)
  202. // NB: Unconditionally using the xfer addr w/o hostname
  203. .setFirstBadLink(targets[0].getXferAddr())
  204. .build()
  205. .writeDelimitedTo(replyOut);
  206. replyOut.flush();
  207. }
  208. // 关闭到下游节点的Socket、 输入流以及输出流
  209. IOUtils.closeStream(mirrorOut);
  210. mirrorOut = null;
  211. IOUtils.closeStream(mirrorIn);
  212. mirrorIn = null;
  213. IOUtils.closeSocket(mirrorSock);
  214. mirrorSock = null;
  215. if (isClient) {
  216. LOG.error("{}:Exception transfering block {} to mirror {}",
  217. datanode, block, mirrorNode, e);
  218. throw e;
  219. } else {
  220. LOG.info("{}:Exception transfering {} to mirror {}- continuing " +
  221. "without the mirror", datanode, block, mirrorNode, e);
  222. incrDatanodeNetworkErrors();
  223. }
  224. }
  225. }
  226. // send connect-ack to source for clients and not transfer-RBW/Finalized
  227. if (isClient && !isTransfer) {
  228. if (mirrorInStatus != SUCCESS) {
  229. LOG.debug("Datanode {} forwarding connect ack to upstream " +
  230. "firstbadlink is {}", targets.length, firstBadLink);
  231. }
  232. // 向上游节点返回请求确认
  233. BlockOpResponseProto.newBuilder()
  234. .setStatus(mirrorInStatus)
  235. .setFirstBadLink(firstBadLink)
  236. .build()
  237. .writeDelimitedTo(replyOut);
  238. replyOut.flush();
  239. }
  240. // receive the block and mirror to the next target
  241. if (blockReceiver != null) {
  242. String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
  243. // 调用BlockReceiver.receiveBlock()从上游节点接收数据块, 然后将数据块发送到下游节点
  244. blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
  245. dataXceiverServer.getWriteThrottler(), targets, false);
  246. // send close-ack for transfer-RBW/Finalized
  247. if (isTransfer) {
  248. LOG.trace("TRANSFER: send close-ack");
  249. writeResponse(SUCCESS, null, replyOut);
  250. }
  251. }
  252. // 成功执行了BlockReceiver.receiveBlock()之后, writeBlock()方法就会更新
  253. // 当前数据节点上新写入数据块副本的时间戳、 副本文件长度等信息。
  254. // 如果是数据流管道关闭的恢复操作或者是数据块的复制操作, 则调用
  255. // Datanode.closeBlock()方法向Namenode汇报Datanode接收了新的数据块,
  256. // Datanode.closeBlock()调用BPOfferService.notifyNamenodeReceivedBlock()
  257. // 通知Namenode。 对于客户端发起的写数据请求, 在PacketResponder线程中已经
  258. // 通过调用Datanode.closeBlock()方法关闭了数据块
  259. // update its generation stamp
  260. if (isClient &&
  261. stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
  262. block.setGenerationStamp(latestGenerationStamp);
  263. block.setNumBytes(minBytesRcvd);
  264. }
  265. // if this write is for a replication request or recovering
  266. // a failed close for client, then confirm block. For other client-writes,
  267. // the block is finalized in the PacketResponder.
  268. if (isDatanode ||
  269. stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
  270. datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
  271. LOG.info("Received {} src: {} dest: {} of size {}",
  272. block, remoteAddress, localAddress, block.getNumBytes());
  273. }
  274. // 对于复制操作, 不需要向下游节点转发数据块, 也不需要接收下游节点的确认
  275. // 所以成功接收完数据块之后, 在当前节点直接返回确认消息
  276. if(isClient) {
  277. size = block.getNumBytes();
  278. }
  279. } catch (IOException ioe) {
  280. LOG.info("opWriteBlock {} received exception {}",
  281. block, ioe.toString());
  282. incrDatanodeNetworkErrors();
  283. throw ioe;
  284. } finally {
  285. // close all opened streams
  286. // 关闭上下游节点的输入/ 输 出流, 同时关闭blockReceiver对象
  287. IOUtils.closeStream(mirrorOut);
  288. IOUtils.closeStream(mirrorIn);
  289. IOUtils.closeStream(replyOut);
  290. IOUtils.closeSocket(mirrorSock);
  291. IOUtils.closeStream(blockReceiver);
  292. setCurrentBlockReceiver(null);
  293. }
  294. //update metrics
  295. datanode.getMetrics().addWriteBlockOp(elapsed());
  296. datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
  297. }

上述过程成功建立了与下游节点的输入/ 输出流后, writeBlock()方法就会调用blockReceiver.receiveBlock()方法从上游节点接收数据块, 然后将数据块发送到下游节点。同时blockReceiver对象还会从下游节点接收数据块中数据包的确认消息, 并且将这个确认消息转发到上游节点。

BlockReceiver下一篇继续肝。。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/592795
推荐阅读
相关标签
  

闽ICP备14008679号