nextBlockOutputStream 调用 createBlockOutputStream 创建到 pipeline 第一个DataNode 的输出流,如果失败,nextBlockOutputStream 会调用 abadonBlock() 放弃这个块,并将DataNode将入到故障节点队列中。然后重新向DataNode 申请新的数据块。重试一定次数,不成功就抛出异常
private LocatedBlock nextBlockOutputStream() throws IOException {
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
addBlock() 会将故障节点列表上报至NameNode.
private LocatedBlock nextBlockOutputStream() throws IOException
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// write out data to remote datanode
try {
} catch (IOException e) {
throw e;
synchronized void tryMarkPrimaryDatanodeFailed() {
// There should be no existing error and no ongoing restart.
if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
errorIndex = 0;
DataStreamer 内部线程类 ResponseProcessor 负责处理ack带来的故障信息
以上三种异常情况,都会更新 hasError 、errorIndex 、 restartingNodeIndex 三个值
DataStreamer 的run 方法 每次循坏,hasError 、errorIndex 、 restartingNodeIndex 三个值的情况,适时触发 processDatanodeError()方法
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
private boolean processDatanodeError() throws IOException {
if (response != null) {
DFSClient.LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
// 第一步关闭数据流
//第二部将 ackQueue 数据包 移到 dataQueue
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
synchronized (dataQueue) {
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
lastAckedSeqno = endOfBlockPacket.seqno;
} else {
return doSleep;

重点看下 setupPipelineForAppendOrRecovery()方法
private boolean setupPipelineForAppendOrRecovery() throws IOException {
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, errorIndex);
setPipeline(newnodes, newStorageTypes, newStorageIDs);
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
try {
//向名字节点 Namenode更新block时间戳,这样异常DataNode过期数据块会被删除
if (success) {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;

private void addDatanode2ExistingPipeline() throws IOException {
//PIPELINE_SETUP_CREATE 、PIPELINE_CLOSE 两种状态下不需要复制数据,后期交给 Namenode 进行冗余备份操作
if (!isAppend && lastAckedSeqno < 0
&& stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
//no data have been written
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE
|| stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//pipeline is closing
transfer(src, targets, targetStorageTypes, lb.getBlockToken());

通过调用 BlockReceiver 的内部线程类PacketResponder 的sendAckUpstreamUnprotected()方法实现异常处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。