赞
踩
参考源码hadoop-3.3.0,这部分代码写的质量真的不太行,好些方法几百行。
HDFS使用数据流管道方式(数据管道的建立可以参见写数据系列第三篇)来写数据。 DFSClient通过使用写数据流程(一)创建的数据输出流out调用write方法完成数据的传输,传输中将数据切分成多个packet,而后往dataQueue中存储切分之后的packet,在写入完成后调用notifyAll方法重新激活在wait等待的DataStreamer线程,而后在此线程中的run()方法中调用DataStreaamer#nextBlockOutputStream方法,这里会调用Sender.writeBlock()方法(此方法在DataStreamer#createBlockOutputStream)触发一个写数据块请求, 这个请求会传送到数据流管道中的每一个数据节点, 这个写数据块请求通过流式接口到达Datanode之后, Datanode上监听流式接口请求的DataXceiverServer会接收这个请求, 并构造一个DataXceiver对象, 然后在DataXceiver对象上调用DataXceiver.writeBlock()方法响应这个请求。 当前Datanode的DataXceiver.writeBlock()方法会级联向数据流管道中的下一个Datanode发送写数据块请求, 这个流式请求会一直在数据流管道中传递下去, 直到写数据块请求到达数据流管道中的最后一个Datanode。数据流管道中的最后一个数据节点会回复请求确认, 这个确认消息逆向地通过数据流管道送回DFSClient。
当一个数据块中的所有数据包都成功发送完毕, 并且收到确认消息后, DFSClient会发送一个空数据包标识当前数据块发送完毕。 至此, 整个数据块发送流程结束。
DFSClient收到请求确认后, 将要写入的数据块切分成若干个数据包(packet) , 然后依次向数据流管道中发送这些数据包。 数据包会首先从DFSClient发送到数据流管道中的第一个数据节点(这里是dn1), dn1成功接收数据包后, 会将数据包写入磁盘, 然后将数据包发送到数据流管道中的第二个节点(dn2) 。 依此类推, 当数据包到达数据流管道中的最后一个节点(dn3)时, dn3会对收到的数据包进行校验, 如果校验成功, dn3会发送数据包确认消息, 这个确认消息会逆向地通过数据流管道送回DFSClient。
先从DFSOutputStream#write方法开始吧:
- public class DataWriteTest {
- public static void main(String[] args) throws IOException {
- Configuration conf = new Configuration();
- FileSystem fileSystem = FileSystem.newInstance(conf);
- // 上传文件
- FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/user/clare/test.txt"));
- fsDataOutputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8));
- }
- }
这个write方法经过多层封装调用,最后调用的是DFSOutputStream的父类FSOutputSummer中的write方法:
- /** Write one byte */
- @Override
- public synchronized void write(int b) throws IOException {
- buf[count++] = (byte)b;
- if(count == buf.length) {
- // 强制对任何缓冲的输出字节进行校验 和 写出到基础输出流
- flushBuffer();
- }
- }
-
- /* Forces buffered output bytes to be checksummed and written out to
- * the underlying output stream. If there is a trailing partial chunk in the
- * buffer,
- * 1) flushPartial tells us whether to flush that chunk
- * 2) if flushPartial is true, keep tells us whether to keep that chunk in the
- * buffer (if flushPartial is false, it is always kept in the buffer)
- *
- * Returns the number of bytes that were flushed but are still left in the
- * buffer (can only be non-zero if keep is true).
- * 如果设置为在buf中保留chunk(keep 为true),则返回值为非0
- */
- protected synchronized int flushBuffer(boolean keep,
- boolean flushPartial) throws IOException {
- int bufLen = count;
- int partialLen = bufLen % sum.getBytesPerChecksum();
- int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
- if (lenToFlush != 0) {
- // 为给定的数据块生成校验和,并将数据块和校验和输出到底层输出流。
- writeChecksumChunks(buf, 0, lenToFlush);
- if (!flushPartial || keep) {
- count = partialLen;
- System.arraycopy(buf, bufLen - count, buf, 0, count);
- } else {
- count = 0;
- }
- }
-
- // total bytes left minus unflushed bytes left
- return count - (bufLen - lenToFlush);
- }
-
- /** Generate checksums for the given data chunks and output chunks & checksums
- * to the underlying output stream.
- */
- private void writeChecksumChunks(byte b[], int off, int len)
- throws IOException {
- // 计算校验和
- sum.calculateChunkedSums(b, off, len, checksum, 0);
- TraceScope scope = createWriteTraceScope();
- try {
- // 虽然这个命名是getBytesPerChecksum,但我看了下貌似这个是指chunk,命名不规范
- // 因此这里通过for循环一个个chunk写
- for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
- int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
- int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
- // 写出chunk
- writeChunk(b, off + i, chunkLen, checksum, ckOffset,
- getChecksumSize());
- }
- } finally {
- if (scope != null) {
- scope.close();
- }
- }
- }
而后调用DFSOutputStream#writeChunk
- @Override
- protected synchronized void writeChunk(byte[] b, int offset, int len,
- byte[] checksum, int ckoff, int cklen) throws IOException {
- // 做一些校验,如
- writeChunkPrepare(len, ckoff, cklen);
-
- currentPacket.writeChecksum(checksum, ckoff, cklen);
- currentPacket.writeData(b, offset, len);
- currentPacket.incNumChunks();
- getStreamer().incBytesCurBlock(len);
-
- // If packet is full, enqueue it for transmission
- if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
- getStreamer().getBytesCurBlock() == blockSize) {
- // 如果packet满了,则将其写入dataQueue便于后续传输
- enqueueCurrentPacketFull();
- }
- }
-
- synchronized void enqueueCurrentPacketFull() throws IOException {
- LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
- + " appendChunk={}, {}", currentPacket, src, getStreamer()
- .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
- getStreamer());
- // 此方法调用DataStreamer#waitAndQueuePacket方法等待dataQueue的空间写入packet
- enqueueCurrentPacket();
- adjustChunkBoundary();
- endBlock();
- }
-
- void enqueueCurrentPacket() throws IOException {
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
- }
调用waitAndQueuePacket方法将packet添加到队列中,其中当dataQueue满了的时候,会进行等待,但在finally中最终还是会将packet添加到队列中:
- /**
- * wait for space of dataQueue and queue the packet
- *
- * @param packet the DFSPacket to be queued
- * @throws IOException
- */
- void waitAndQueuePacket(DFSPacket packet) throws IOException {
- synchronized (dataQueue) {
- try {
- // If queue is full, then wait till we have enough space
- boolean firstWait = true;
- try {
- while (!streamerClosed && dataQueue.size() + ackQueue.size() >
- dfsClient.getConf().getWriteMaxPackets()) {
- if (firstWait) {
- Span span = Tracer.getCurrentSpan();
- if (span != null) {
- span.addTimelineAnnotation("dataQueue.wait");
- }
- firstWait = false;
- }
- try {
- dataQueue.wait();
- } catch (InterruptedException e) {
- // If we get interrupted while waiting to queue data, we still need to get rid
- // of the current packet. This is because we have an invariant that if
- // currentPacket gets full, it will get queued before the next writeChunk.
- //
- // Rather than wait around for space in the queue, we should instead try to
- // return to the caller as soon as possible, even though we slightly overrun
- // the MAX_PACKETS length.
- Thread.currentThread().interrupt();
- break;
- }
- }
- } finally {
- Span span = Tracer.getCurrentSpan();
- if ((span != null) && (!firstWait)) {
- span.addTimelineAnnotation("end.wait");
- }
- }
- checkClosed();
- queuePacket(packet);
- } catch (ClosedChannelException cce) {
- LOG.debug("Closed channel exception", cce);
- }
- }
- }
数据块的写请求主要在DataStreamer#run()中触发
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- // 此逻辑用于创建新文件
- LOG.debug("Allocating new block: {}", this);
- //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
- // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
- setPipeline(nextBlockOutputStream());
- // 初始化数据流,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
- // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
- // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
- initDataStreaming();
- } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
- // 此逻辑用于往文件添加数据
- LOG.debug("Append to block {}", block);
- // 这里也是创建一个dataStreamer
- setupPipelineForAppendOrRecovery();
- if (streamerClosed) {
- continue;
- }
- // 初始化dataStream,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
- // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
- initDataStreaming();
- }
在DataStreamer#nextBlockOutputStream方法中调用DataStreamer#locateFollowingBlock方法向nn请求创建一个block,而后触发写数据块请求:
- // 在DataStreamer#nextBlockOutputStream方法中
- lb = locateFollowingBlock(
- excluded.length > 0 ? excluded : null, oldBlock);
-
- private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
- ExtendedBlock oldBlock) throws IOException {
- // 这个方法就是在配置各种参数,而后dfsClient通过rpc调用namenodeRpcServer#addBlock
- return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
- stat.getFileId(), favoredNodes, addBlockFlags);
- }
-
- // NanenodeRpcServer#addBlock
- // 而这个方法就是调用FSNamesystem中的方法去后续的添加block
- @Override
- public LocatedBlock addBlock(String src, String clientName,
- ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
- String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
- throws IOException {
- checkNNStartup();
- LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
- clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
- if (locatedBlock != null) {
- metrics.incrAddBlockOps();
- }
- return locatedBlock;
- }
下面看下FSNamesystem#getAdditionalBlock方法,此方法为指定的文件名(正在写入)获取附加块。返回一个由块和一组机器组成的数组。此列表中的第一个应该是客户端写入数据的位置。列表中的后续项目必须在与第一个数据节点的连接中提供。确保之前的块已被 datanodes 报告并被复制。其实就是一个pipeline中的机器和block位置。
在这个类中,分为两部分:
- /**
- * The client would like to obtain an additional block for the indicated
- * filename (which is being written-to). Return an array that consists
- * of the block, plus a set of machines. The first on this list should
- * be where the client writes data. Subsequent items in the list must
- * be provided in the connection to the first datanode.
- *
- * Make sure the previous blocks have been reported by datanodes and
- * are replicated. Will return an empty 2-elt array if we want the
- * client to "try again later".
- */
- LocatedBlock getAdditionalBlock(
- String src, long fileId, String clientName, ExtendedBlock previous,
- DatanodeInfo[] excludedNodes, String[] favoredNodes,
- EnumSet<AddBlockFlag> flags) throws IOException {
- final String operationName = "getAdditionalBlock";
- NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
- " for {}", src, fileId, clientName);
-
- LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- FSDirWriteFileOp.ValidateAddBlockResult r;
- checkOperation(OperationCategory.READ);
- final FSPermissionChecker pc = getPermissionChecker();
- FSPermissionChecker.setOperationType(operationName);
- readLock();
- try {
- checkOperation(OperationCategory.READ);
- // 分析处于读取锁定状态的文件的状态,以确定客户端是否可以添加新块、检测潜在的重试、
- // 租用不匹配以及倒数第二个块的最小复制。为新块生成目标 DataNode 位置,但不要创建新块。
- r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
- previous, onRetryBlock);
- } finally {
- readUnlock(operationName);
- }
-
- if (r == null) {
- assert onRetryBlock[0] != null : "Retry block is null";
- // This is a retry. Just return the last block.
- return onRetryBlock[0];
- }
-
- // 选择存储这个新申请的block的存储
- DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
- blockManager, src, excludedNodes, favoredNodes, flags, r);
-
- checkOperation(OperationCategory.WRITE);
- writeLock();
- LocatedBlock lb;
- try {
- checkOperation(OperationCategory.WRITE);
- // 申请新block,过程还会将block添加到对应的文件中
- lb = FSDirWriteFileOp.storeAllocatedBlock(
- this, src, fileId, clientName, previous, targets);
- } finally {
- writeUnlock(operationName);
- }
- getEditLog().logSync();
- return lb;
- }
因此下面根据上文中提及的两个步骤分别看:
在这个方法中主要通过FSDirWriteFileOp.validateAddBlock去进行校验,分析处于读取锁定状态的文件的状态,以确定客户端是否可以添加新块、检测潜在的重试、租用不匹配以及倒数第二个块的最小复制。为新块生成目标 DataNode 位置,但不要创建新块。
- /**
- * Part I of getAdditionalBlock().
- * Analyze the state of the file under read lock to determine if the client
- * can add a new block, detect potential retries, lease mismatches,
- * and minimal replication of the penultimate block.
- *
- * Generate target DataNode locations for the new block,
- * but do not create the new block yet.
- */
- static ValidateAddBlockResult validateAddBlock(
- FSNamesystem fsn, FSPermissionChecker pc,
- String src, long fileId, String clientName,
- ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
- final long blockSize;
- final short numTargets;
- final byte storagePolicyID;
- String clientMachine;
- final BlockType blockType;
-
- // 解析文件路径
- INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
- // 分析状态,对当前Namenode中记录的该文件的最后一个数据块与Client上报的最后一个数据块进行比较,
- // 最后判断是否发生了RPC请求的重传, 或者请求异常等情况
- FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
- previous, onRetryBlock);
- if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. No need to generate new locations.
- // Use the last block if it has locations.
- return null;
- }
-
- // 根据文件状态对象获取内存中的文件对象
- final INodeFile pendingFile = fileState.inode;
- if (!fsn.checkFileProgress(src, pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet: " + src);
- }
- if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
- throw new IOException("File has reached the limit on maximum number of"
- + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
- + "): " + pendingFile.getBlocks().length + " >= "
- + fsn.maxBlocksPerFile);
- }
- blockSize = pendingFile.getPreferredBlockSize();
- clientMachine = pendingFile.getFileUnderConstructionFeature()
- .getClientMachine();
- blockType = pendingFile.getBlockType();
- ErasureCodingPolicy ecPolicy = null;
- if (blockType == BlockType.STRIPED) {
- ecPolicy =
- FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);
- numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
- + ecPolicy.getSchema().getNumParityUnits());
- } else {
- numTargets = pendingFile.getFileReplication();
- }
- storagePolicyID = pendingFile.getStoragePolicyID();
- // 从INodeFile对象中获取的变量填充成一个ValidateAddBlockResult对象
- return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
- clientMachine, blockType, ecPolicy);
- }
调用FSDirWriteFileOp.storeAllocatedBlock创建一个新的block,过程中会再次调用analyzeFileState方法对文件状态进行分析,如果过程中状态未变化,则进行后续的block的申请,申请之前会先将当前file的最后一个block commit,最后才是申请新的block并添加到文件中
- /**
- * Part II of getAdditionalBlock().
- * Should repeat the same analysis of the file state as in Part 1,
- * but under the write lock.
- * If the conditions still hold, then allocate a new block with
- * the new targets, add it to the INode and to the BlocksMap.
- */
- static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
- long fileId, String clientName, ExtendedBlock previous,
- DatanodeStorageInfo[] targets) throws IOException {
- long offset;
- // Run the full analysis again, since things could have changed
- // while chooseTarget() was executing.
- LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
- // 再次调用analyzeFileState()方法以防止chooseTarget4NewBlock()
- // 方法执行期间Namenode状态发生改变, 例如在调用FSDirWriteFileOp.chooseTarget4NewBlock()
- // 方法时, 上一次触发的分配请求执行完成, 这时无须再次分配新的数据块, 返回上一次分配的数据块即可。
- FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
- previous, onRetryBlock);
- final INodeFile pendingFile = fileState.inode;
- src = fileState.path;
-
- if (onRetryBlock[0] != null) {
- if (onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. Just return the last block if having locations.
- return onRetryBlock[0];
- } else {
- // add new chosen targets to already allocated block and return
- BlockInfo lastBlockInFile = pendingFile.getLastBlock();
- lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
- lastBlockInFile, targets, pendingFile.getBlockType());
- offset = pendingFile.computeFileSize();
- return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
- }
- }
-
- // commit the last block and complete it if it has minimum replicas
- // 提交上一个block,提交时判断是否副本数等要求已满足
- fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
- ExtendedBlock.getLocalBlock(previous));
-
- // allocate new block, record block locations in INode.
- final BlockType blockType = pendingFile.getBlockType();
- // allocate new block, record block locations in INode.
- // 申请block
- Block newBlock = fsn.createNewBlock(blockType);
- INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
- // 将新建的block添加到对应的文件上
- saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
-
- // 往editLog中添加对应的元数据(add block)
- persistNewBlock(fsn, src, pendingFile);
- offset = pendingFile.computeFileSize();
-
- // Return located block
- // 构建一个LocatedBlock对象
- return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
- }
在上文申请block完成之后,代码依据createBlockOutputStream方法去创建管道,方法中主要完成三件事:
- // connects to the first datanode in the pipeline
- // Returns true if success, otherwise return failure.
- //
- boolean createBlockOutputStream(DatanodeInfo[] nodes,
- StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
- long newGS, boolean recoveryFlag) {
- if (nodes.length == 0) {
- LOG.info("nodes are empty for write pipeline of " + block);
- return false;
- }
- String firstBadLink = "";
- boolean checkRestart = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
- }
-
- // persist blocks on namenode on next flush
- persistBlocks.set(true);
-
- int refetchEncryptionKey = 1;
- while (true) {
- boolean result = false;
- DataOutputStream out = null;
- try {
- assert null == s : "Previous socket unclosed";
- assert null == blockReplyStream : "Previous blockReplyStream unclosed";
- // 步骤1,建立socket连接,返回第一个dn建立连接的socket
- s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
- long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
- long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
-
- OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
- IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
- unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- // 创建输出流对象,使用装饰者模式,真实使用BufferedOutputStream
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
- blockReplyStream = new DataInputStream(unbufIn);
-
- //
- // Xmit header info to datanode
- //
-
- BlockConstructionStage bcs = recoveryFlag ?
- stage.getRecoveryStage() : stage;
-
- // We cannot change the block length in 'block' as it counts the number
- // of bytes ack'ed.
- ExtendedBlock blockCopy = block.getCurrentBlock();
- blockCopy.setNumBytes(stat.getBlockSize());
-
- boolean[] targetPinnings = getPinnings(nodes);
- // send the request
- // 步骤二:发送写数据请求,这个方法将buffer中的数据刷写到dn中
- new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
- dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
- nodes.length, block.getNumBytes(), bytesSent, newGS,
- checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
- (targetPinnings != null && targetPinnings[0]), targetPinnings,
- nodeStorageIDs[0], nodeStorageIDs);
-
- // receive ack for connect
- // 步骤3,接收响应
- BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
- PBHelperClient.vintPrefixed(blockReplyStream));
- Status pipelineStatus = resp.getStatus();
- firstBadLink = resp.getFirstBadLink();
-
- // Got an restart OOB ack.
- // If a node is already restarting, this status is not likely from
- // the same node. If it is from a different node, it is not
- // from the local datanode. Thus it is safe to treat this as a
- // regular node error.
- // 检查通道中dn是否处于重启状态,如果客户端对应这个dn不是重启状态才进入此判断
- if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
- !errorState.isRestartingNode()) {
- checkRestart = true;
- throw new IOException("A datanode is restarting.");
- }
-
- String logInfo = "ack with firstBadLink as " + firstBadLink;
- DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
-
- assert null == blockStream : "Previous blockStream unclosed";
- blockStream = out;
- result = true; // success
- errorState.resetInternalError();
- lastException.clear();
- // remove all restarting nodes from failed nodes list
- failed.removeAll(restartingNodes);
- restartingNodes.clear();
- } catch (IOException ie) {
- if (!errorState.isRestartingNode()) {
- LOG.warn("Exception in createBlockOutputStream " + this, ie);
- }
- if (ie instanceof InvalidEncryptionKeyException &&
- refetchEncryptionKey > 0) {
- LOG.info("Will fetch a new encryption key and retry, "
- + "encryption key was invalid when connecting to "
- + nodes[0] + " : " + ie);
- // The encryption key used is invalid.
- refetchEncryptionKey--;
- dfsClient.clearDataEncryptionKey();
- // Don't close the socket/exclude this node just yet. Try again with
- // a new encryption key.
- continue;
- }
-
- // find the datanode that matches
- // 设置BadNode,把第一个不能正常连接的DataNode标记为BadNode,
- // 如果返回的消息不能认定谁是BadNode,那么BadNode就是你啦,第一个节点
- if (firstBadLink.length() != 0) {
- for (int i = 0; i < nodes.length; i++) {
- // NB: Unconditionally using the xfer addr w/o hostname
- if (firstBadLink.equals(nodes[i].getXferAddr())) {
- errorState.setBadNodeIndex(i);
- break;
- }
- }
- } else {
- assert !checkRestart;
- errorState.setBadNodeIndex(0);
- }
-
- final int i = errorState.getBadNodeIndex();
- // Check whether there is a restart worth waiting for.
- if (checkRestart) {
- errorState.initRestartingNode(i,
- "Datanode " + i + " is restarting: " + nodes[i],
- shouldWaitForRestart(i));
- }
- errorState.setInternalError();
- lastException.set(ie);
- result = false; // error
- } finally {
- if (!result) {
- IOUtils.closeSocket(s);
- s = null;
- IOUtils.closeStream(out);
- IOUtils.closeStream(blockReplyStream);
- blockReplyStream = null;
- }
- }
- return result;
- }
- }
写数据块请求通过流式接口到达Datanode之后, Datanode上监听流式接口请求的DataXceiverServer会接收这个请求, 并构造一个DataXceiver对象, 然后在DataXceiver对象上调用DataXceiver.writeBlock()方法响应这个请求。 当前Datanode的DataXceiver.writeBlock()方法会级联向数据流管道中的下一个Datanode发送写数据块请求, 这个流式请求会一直在数据流管道中传递下去, 直到写数据块请求到达数据流管道中的最后一个Datanode。
上文中Sender(out).writeBlock方法触发写数据请求后,dn的DataXceiverServer接收,接受到后交由DataXceiver进行处理,这两个都是线程对象,所以这里可以先看DataXceiverServer#run()方法,在run方法中创建了DataXceiver线程供后续操作,这里过程也是在DataXceiver#run()方法中实现,经过调用,后面会到DataXceiver#writeBlock方法(这个方法能看下来挺需要耐心。。。。太长了):
- @Override
- public void writeBlock(final ExtendedBlock block,
- final StorageType storageType,
- final Token<BlockTokenIdentifier> blockToken,
- final String clientname,
- final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes,
- final DatanodeInfo srcDataNode,
- final BlockConstructionStage stage,
- final int pipelineSize,
- final long minBytesRcvd,
- final long maxBytesRcvd,
- final long latestGenerationStamp,
- DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy,
- boolean allowLazyPersist,
- final boolean pinning,
- final boolean[] targetPinnings,
- final String storageId,
- final String[] targetStorageIds) throws IOException {
- previousOpClientName = clientname;
- updateCurrentThreadName("Receiving block " + block);
- // isDatanode变量指示当前写操作是否是DFSClient发起的 : false
- final boolean isDatanode = clientname.length() == 0;
- // isClient变量与isDatanode相反, 表示是Datanode触发的写操作 true
- final boolean isClient = !isDatanode;
- // isTransfer变量指示当前的写操作是否为数据块复制操作, 利用数据流管道状态来判断 false
- final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
- || stage == BlockConstructionStage.TRANSFER_FINALIZED;
-
- // 是否开启lazyPersist false
- allowLazyPersist = allowLazyPersist &&
- (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
- long size = 0;
- // reply to upstream datanode or client
- final DataOutputStream replyOut = getBufferedOutputStream();
-
- int nst = targetStorageTypes.length;
- StorageType[] storageTypes = new StorageType[nst + 1];
- storageTypes[0] = storageType;
- if (targetStorageTypes.length > 0) {
- System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
- }
-
- // To support older clients, we don't pass in empty storageIds
- final int nsi = targetStorageIds.length;
- final String[] storageIds;
- if (nsi > 0) {
- storageIds = new String[nsi + 1];
- storageIds[0] = storageId;
- if (targetStorageTypes.length > 0) {
- System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
- }
- } else {
- storageIds = new String[0];
- }
- checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
- BlockTokenIdentifier.AccessMode.WRITE,
- storageTypes, storageIds);
-
- // check single target for transfer-RBW/Finalized
- if (isTransfer && targets.length > 0) {
- throw new IOException(stage + " does not support multiple targets "
- + Arrays.asList(targets));
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("opWriteBlock: stage={}, clientname={}\n " +
- "block ={}, newGs={}, bytesRcvd=[{}, {}]\n " +
- "targets={}; pipelineSize={}, srcDataNode={}, pinning={}",
- stage, clientname, block, latestGenerationStamp, minBytesRcvd,
- maxBytesRcvd, Arrays.asList(targets), pipelineSize, srcDataNode,
- pinning);
- LOG.debug("isDatanode={}, isClient={}, isTransfer={}",
- isDatanode, isClient, isTransfer);
- LOG.debug("writeBlock receive buf size {} tcp no delay {}",
- peer.getReceiveBufferSize(), peer.getTcpNoDelay());
- }
-
- // We later mutate block's generation stamp and length, but we need to
- // forward the original version of the block to downstream mirrors, so
- // make a copy here.
- final ExtendedBlock originalBlock = new ExtendedBlock(block);
- if (block.getNumBytes() == 0) {
- block.setNumBytes(dataXceiverServer.estimateBlockSize);
- }
- LOG.info("Receiving {} src: {} dest: {}",
- block, remoteAddress, localAddress);
-
- // 到下游数据节点的输出流
- DataOutputStream mirrorOut = null; // stream to next target
- //下游数据节点的输入流
- DataInputStream mirrorIn = null; // reply from next target
- //到下游节点的Socket
- Socket mirrorSock = null; // socket to next target
- //下游节点的地址,即名称:端口
- String mirrorNode = null; // the name:port of next target
- String firstBadLink = ""; // first datanode that failed in connection setup
- Status mirrorInStatus = SUCCESS;
- final String storageUuid;
- final boolean isOnTransientStorage;
- try {
- final Replica replica;
- if (isDatanode ||
- stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- // open a block receiver
- //打开一个BlockReceiver, 用于从上游节点接收数据块
- setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
- peer.getRemoteAddressString(),
- peer.getLocalAddressString(),
- stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode, requestedChecksum,
- cachingStrategy, allowLazyPersist, pinning, storageId));
- replica = blockReceiver.getReplica();
- } else {
- replica = datanode.data.recoverClose(
- block, latestGenerationStamp, minBytesRcvd);
- }
- storageUuid = replica.getStorageUuid();
- isOnTransientStorage = replica.isOnTransientStorage();
-
- //
- // Connect to downstream machine, if appropriate
- // 连接到下游节点
- //
- if (targets.length > 0) {
- InetSocketAddress mirrorTarget = null;
- // Connect to backup machine
- mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
- LOG.debug("Connecting to datanode {}", mirrorNode);
- mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
- mirrorSock = datanode.newSocket();
- try {
-
- DataNodeFaultInjector.get().failMirrorConnection();
-
- int timeoutValue = dnConf.socketTimeout +
- (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
- int writeTimeout = dnConf.socketWriteTimeout +
- (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
- // 建立到下游节点的Socket连接
- NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
- mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
- mirrorSock.setSoTimeout(timeoutValue);
- mirrorSock.setKeepAlive(true);
- if (dnConf.getTransferSocketSendBufferSize() > 0) {
- mirrorSock.setSendBufferSize(
- dnConf.getTransferSocketSendBufferSize());
- }
-
- OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
- writeTimeout);
- InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
- DataEncryptionKeyFactory keyFactory =
- datanode.getDataEncryptionKeyFactoryForBlock(block);
- SecretKey secretKey = null;
- if (dnConf.overwriteDownstreamDerivedQOP) {
- String bpid = block.getBlockPoolId();
- BlockKey blockKey = datanode.blockPoolTokenSecretManager
- .get(bpid).getCurrentKey();
- secretKey = blockKey.getKey();
- }
- IOStreamPair saslStreams = datanode.saslClient.socketSend(
- mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
- blockToken, targets[0], secretKey);
- unbufMirrorOut = saslStreams.out;
- unbufMirrorIn = saslStreams.in;
- // 创建下游节点的输入输出流对象
- mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
- smallBufferSize));
- mirrorIn = new DataInputStream(unbufMirrorIn);
-
- String targetStorageId = null;
- if (targetStorageIds.length > 0) {
- // Older clients may not have provided any targetStorageIds
- targetStorageId = targetStorageIds[0];
- }
- if (targetPinnings != null && targetPinnings.length > 0) {
- // 向下游节点发送数据块写入请求,这里发送请求后,经过BlockReceiver接受
- new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
- blockToken, clientname, targets, targetStorageTypes,
- srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy,
- allowLazyPersist, targetPinnings[0], targetPinnings,
- targetStorageId, targetStorageIds);
- } else {
- //接收来自下游节点的请求确认, 并记录请求确认状态
- new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
- blockToken, clientname, targets, targetStorageTypes,
- srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy,
- allowLazyPersist, false, targetPinnings,
- targetStorageId, targetStorageIds);
- }
-
- mirrorOut.flush();
-
- DataNodeFaultInjector.get().writeBlockAfterFlush();
-
- // read connect ack (only for clients, not for replication req)
- if (isClient) {
- BlockOpResponseProto connectAck =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
- mirrorInStatus = connectAck.getStatus();
- firstBadLink = connectAck.getFirstBadLink();
- if (mirrorInStatus != SUCCESS) {
- LOG.debug("Datanode {} got response for connect" +
- "ack from downstream datanode with firstbadlink as {}",
- targets.length, firstBadLink);
- }
- }
-
- } catch (IOException e) {
- // 出现异常, 向上游节点发送异常响应
- if (isClient) {
- BlockOpResponseProto.newBuilder()
- .setStatus(ERROR)
- // NB: Unconditionally using the xfer addr w/o hostname
- .setFirstBadLink(targets[0].getXferAddr())
- .build()
- .writeDelimitedTo(replyOut);
- replyOut.flush();
- }
-
- // 关闭到下游节点的Socket、 输入流以及输出流
- IOUtils.closeStream(mirrorOut);
- mirrorOut = null;
- IOUtils.closeStream(mirrorIn);
- mirrorIn = null;
- IOUtils.closeSocket(mirrorSock);
- mirrorSock = null;
- if (isClient) {
- LOG.error("{}:Exception transfering block {} to mirror {}",
- datanode, block, mirrorNode, e);
- throw e;
- } else {
- LOG.info("{}:Exception transfering {} to mirror {}- continuing " +
- "without the mirror", datanode, block, mirrorNode, e);
- incrDatanodeNetworkErrors();
- }
- }
- }
-
- // send connect-ack to source for clients and not transfer-RBW/Finalized
- if (isClient && !isTransfer) {
- if (mirrorInStatus != SUCCESS) {
- LOG.debug("Datanode {} forwarding connect ack to upstream " +
- "firstbadlink is {}", targets.length, firstBadLink);
- }
- // 向上游节点返回请求确认
- BlockOpResponseProto.newBuilder()
- .setStatus(mirrorInStatus)
- .setFirstBadLink(firstBadLink)
- .build()
- .writeDelimitedTo(replyOut);
- replyOut.flush();
- }
-
- // receive the block and mirror to the next target
- if (blockReceiver != null) {
- String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
- // 调用BlockReceiver.receiveBlock()从上游节点接收数据块, 然后将数据块发送到下游节点
- blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
- dataXceiverServer.getWriteThrottler(), targets, false);
-
- // send close-ack for transfer-RBW/Finalized
- if (isTransfer) {
- LOG.trace("TRANSFER: send close-ack");
- writeResponse(SUCCESS, null, replyOut);
- }
- }
-
- // 成功执行了BlockReceiver.receiveBlock()之后, writeBlock()方法就会更新
- // 当前数据节点上新写入数据块副本的时间戳、 副本文件长度等信息。
- // 如果是数据流管道关闭的恢复操作或者是数据块的复制操作, 则调用
- // Datanode.closeBlock()方法向Namenode汇报Datanode接收了新的数据块,
- // Datanode.closeBlock()调用BPOfferService.notifyNamenodeReceivedBlock()
- // 通知Namenode。 对于客户端发起的写数据请求, 在PacketResponder线程中已经
- // 通过调用Datanode.closeBlock()方法关闭了数据块
- // update its generation stamp
- if (isClient &&
- stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- block.setGenerationStamp(latestGenerationStamp);
- block.setNumBytes(minBytesRcvd);
- }
-
- // if this write is for a replication request or recovering
- // a failed close for client, then confirm block. For other client-writes,
- // the block is finalized in the PacketResponder.
- if (isDatanode ||
- stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
- LOG.info("Received {} src: {} dest: {} of size {}",
- block, remoteAddress, localAddress, block.getNumBytes());
- }
-
- // 对于复制操作, 不需要向下游节点转发数据块, 也不需要接收下游节点的确认
- // 所以成功接收完数据块之后, 在当前节点直接返回确认消息
- if(isClient) {
- size = block.getNumBytes();
- }
- } catch (IOException ioe) {
- LOG.info("opWriteBlock {} received exception {}",
- block, ioe.toString());
- incrDatanodeNetworkErrors();
- throw ioe;
- } finally {
- // close all opened streams
- // 关闭上下游节点的输入/ 输 出流, 同时关闭blockReceiver对象
- IOUtils.closeStream(mirrorOut);
- IOUtils.closeStream(mirrorIn);
- IOUtils.closeStream(replyOut);
- IOUtils.closeSocket(mirrorSock);
- IOUtils.closeStream(blockReceiver);
- setCurrentBlockReceiver(null);
- }
-
- //update metrics
- datanode.getMetrics().addWriteBlockOp(elapsed());
- datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
- }
上述过程成功建立了与下游节点的输入/ 输出流后, writeBlock()方法就会调用blockReceiver.receiveBlock()方法从上游节点接收数据块, 然后将数据块发送到下游节点。同时blockReceiver对象还会从下游节点接收数据块中数据包的确认消息, 并且将这个确认消息转发到上游节点。
BlockReceiver下一篇继续肝。。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。