当前位置:   article > 正文

hadoop 2.6 源码 解读之写操作中异常处理_already retried 5 times for the same packet

already retried 5 times for the same packet

客户端异常处理

刚开始 创建数据流出错环节

nextBlockOutputStream 调用 createBlockOutputStream 创建到 pipeline 第一个DataNode 的输出流,如果失败,nextBlockOutputStream 会调用 abadonBlock() 放弃这个块,并将DataNode将入到故障节点队列中。然后重新向DataNode 申请新的数据块。重试一定次数,不成功就抛出异常

private LocatedBlock nextBlockOutputStream() throws IOException {
...
//将故障节点放入故障节点列表
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
...

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

addBlock() 会将故障节点列表上报至NameNode.

客户端发生故障的其他环节
发起writeBlock 请求时发生的故障
private LocatedBlock nextBlockOutputStream() throws IOException
{
 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
              dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
              nodes.length, block.getNumBytes(), bytesSent, newGS,
              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
传输数据的过程中发生的故障
// write out data to remote datanode
          try {
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {

            tryMarkPrimaryDatanodeFailed();
            throw e;
          }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这种情况会调用tryMarkPrimaryDatanodeFailed,将第一个节点设置为错误,如下

synchronized void tryMarkPrimaryDatanodeFailed() {
      // There should be no existing error and no ongoing restart.
      if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
        errorIndex = 0;
      }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
数据包发送后,ack信息携带故障datanode信息

DataStreamer 内部线程类 ResponseProcessor 负责处理ack带来的故障信息

以上三种异常情况,都会更新 hasError 、errorIndex 、 restartingNodeIndex 三个值

客户端异常处理

DataStreamer 的run 方法 每次循坏,hasError 、errorIndex 、 restartingNodeIndex 三个值的情况,适时触发 processDatanodeError()方法

if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
            doSleep = processDatanodeError();
          }
  • 1
  • 2
  • 3

processDatanodeError()方法

    private boolean processDatanodeError() throws IOException {
      if (response != null) {
        DFSClient.LOG.info("Error Recovery for " + block +
        " waiting for responder to exit. ");
        return true;
      }
      // 第一步关闭数据流
      closeStream();
      //第二部将 ackQueue 数据包 移到 dataQueue
      // move packets from ack queue to front of the data queue
      synchronized (dataQueue) {
        dataQueue.addAll(0, ackQueue);
        ackQueue.clear();
      }

      // Record the new pipeline failure recovery.
      if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
         lastAckedSeqnoBeforeFailure = lastAckedSeqno;
         pipelineRecoveryCount = 1;
      } else {
        // If we had to recover the pipeline five times in a row for the
        // same packet, this client likely has corrupt data or corrupting
        // during transmission.
        if (++pipelineRecoveryCount > 5) {
          DFSClient.LOG.warn("Error recovering pipeline for writing " +
              block + ". Already retried 5 times for the same packet.");
          lastException.set(new IOException("Failing write. Tried pipeline " +
              "recovery 5 times without success."));
          streamerClosed = true;
          return false;
        }
      }
      //第三步调用setupPipelineForAppendOrRecovery(),重置数据流管道
      boolean doSleep = setupPipelineForAppendOrRecovery();

      if (!streamerClosed && dfsClient.clientRunning) {
        if (stage == BlockConstructionStage.PIPELINE_CLOSE) {

          synchronized (dataQueue) {
            Packet endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
            assert endOfBlockPacket.lastPacketInBlock;
            assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
            lastAckedSeqno = endOfBlockPacket.seqno;
            dataQueue.notifyAll();
          }
          endBlock();
        } else {
          initDataStreaming();
        }
      }

      return doSleep;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

重点看下 setupPipelineForAppendOrRecovery()方法

private boolean setupPipelineForAppendOrRecovery() throws IOException {

//将失败节点从管道中移除,重置数据流管道
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
          arraycopy(nodes, newnodes, errorIndex);

          final StorageType[] newStorageTypes = new StorageType[newnodes.length];
          arraycopy(storageTypes, newStorageTypes, errorIndex);

          final String[] newStorageIDs = new String[newnodes.length];
          arraycopy(storageIDs, newStorageIDs, errorIndex);

          setPipeline(newnodes, newStorageTypes, newStorageIDs);
...

//检查是否需要补充新的节点,当当前节点总数小于副本数除以2,那么需要申请新的节点datanode补充进来
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
            nodes, isAppend, isHflushed)) {
          try {
            addDatanode2ExistingPipeline();
          } 

...
//向名字节点 Namenode更新block时间戳,这样异常DataNode过期数据块会被删除
if (success) {
        // update pipeline at the namenode
        ExtendedBlock newBlock = new ExtendedBlock(
            block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
            nodes, storageIDs);
        // update client side generation stamp
        block = newBlock;
      }
...




}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
新增节点数据复制
private void addDatanode2ExistingPipeline() throws IOException {
//PIPELINE_SETUP_CREATE 、PIPELINE_CLOSE 两种状态下不需要复制数据,后期交给 Namenode 进行冗余备份操作
if (!isAppend && lastAckedSeqno < 0
          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
        //no data have been written
        return;
      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        //pipeline is closing
        return;
      }
...
//写了一半的情况,需要触发数据复制操作
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

服务端异常处理

下游节点或当前节点发生了数据包的校验和错误,均向上游汇报错误信息,并抛出异常。
通过调用 BlockReceiver 的内部线程类PacketResponder 的sendAckUpstreamUnprotected()方法实现异常处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/592787
推荐阅读
相关标签
  

闽ICP备14008679号