赞
踩
见名知意,此类主要用于数据传输,是一个守护线程,在创建DFSOutputStream的过程中被启动,启动之后再run方法之中使用一个while死循环(直到流或者客户端关闭)才停止运行,里面主要的逻辑是通过维护一个dataQueue队列,等待主线程往其中添加packet数据,等到添加了packet数据之后,会触发数据的发送,将数据发送到对应pipeline对应的dn之中,从而完成数据的传输。
根据前文,在DFSOutputStream#newStreamForCreate中会创建对应的DFSOutputStream:
- final DFSOutputStream out;
- // 判断是否配置了ErasureCodingPolicy从而创建不同的DFSOutputStream对象
- // 过程中会使用通过rpc远程创建INodeFile返回的HDFSFileStatus对象
- if(stat.getErasureCodingPolicy() != null) {
- out = new DFSStripedOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
- } else {
- out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes, true);
- }
- // 启动往dn pipeline发送packet数据的的DataStreamer
- out.start();
这里主要看非ErasureCoding一节,即创建DFSOutputStreamer
- /** Construct a new output stream for creating a file. */
- protected DFSOutputStream(DFSClient dfsClient, String src,
- HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
- // 这里主要进行一些初始化的操作
- this(dfsClient, src, flag, progress, stat, checksum);
- this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
-
- // 参见下文关于此方法的解释,因为涉及到一张图,哈哈
- computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
- bytesPerChecksum);
-
- // 在这里直接创建DataStreamer
- if (createStreamer) {
- streamer = new DataStreamer(stat, null, dfsClient, src, progress,
- checksum, cachingStrategy, byteArrayManager, favoredNodes,
- addBlockFlags);
- }
- }
-
- private DFSOutputStream(DFSClient dfsClient, String src,
- EnumSet<CreateFlag> flag,
- Progressable progress, HdfsFileStatus stat, DataChecksum checksum) {
- super(getChecksum4Compute(checksum, stat));
- this.dfsClient = dfsClient;
- this.src = src;
- this.fileId = stat.getFileId();
- this.blockSize = stat.getBlockSize();
- this.blockReplication = stat.getReplication();
- this.fileEncryptionInfo = stat.getFileEncryptionInfo();
- this.cachingStrategy = new AtomicReference<>(
- dfsClient.getDefaultWriteCachingStrategy());
- this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class);
- if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
- this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
- }
- if (flag.contains(CreateFlag.NO_LOCAL_RACK)) {
- this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_RACK);
- }
- if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
- this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
- }
- if (progress != null) {
- DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
- +"{}", src);
- }
-
- initWritePacketSize();
-
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum <= 0) {
- throw new HadoopIllegalArgumentException(
- "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
- }
- if (blockSize % bytesPerChecksum != 0) {
- throw new HadoopIllegalArgumentException("Invalid values: "
- + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY
- + " (=" + bytesPerChecksum + ") must divide block size (=" +
- blockSize + ").");
- }
- this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
- }
下面解释下computePacketChunkSize
- protected void computePacketChunkSize(int psize, int csize) {
- // 64Kb - packetHeader长度(33b,如下图)
- final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
- // getChecksumSize默认是使用crc32,即4b,csize默认是512b,因此chunkSize=516b
- final int chunkSize = csize + getChecksumSize();
- chunksPerPacket = Math.max(bodySize/chunkSize, 1);
- // packet的真实大小
- packetSize = chunkSize*chunksPerPacket;
- DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, "
- + "chunksPerPacket={}, packetSize={}",
- src, chunkSize, chunksPerPacket, packetSize);
- }
- 没啥太多可介绍的,就是参数初始化
-
- /**
- * construction with tracing info
- */
- DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
- String src, Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage, String[] favoredNodes,
- EnumSet<AddBlockFlag> flags) {
- this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
- byteArrayManage, false, favoredNodes, flags);
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
-
- private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
- DFSClient dfsClient, String src,
- Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage,
- boolean isAppend, String[] favoredNodes,
- EnumSet<AddBlockFlag> flags) {
- this.block = new BlockToWrite(block);
- this.dfsClient = dfsClient;
- this.src = src;
- this.progress = progress;
- this.stat = stat;
- this.checksum4WriteBlock = checksum;
- this.cachingStrategy = cachingStrategy;
- this.byteArrayManager = byteArrayManage;
- this.isLazyPersistFile = isLazyPersist(stat);
- this.isAppend = isAppend;
- this.favoredNodes = favoredNodes;
- final DfsClientConf conf = dfsClient.getConf();
- this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
- this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
- this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
- this.addBlockFlags = flags;
- }
由于这个对象实际上是一个线程,因此在DFSOutputStream#newStreamForCreate方法中最后启动start()方法时,就是调用线程的run方法执行操作(实在有点长。。。。)。
- /*
- * streamer thread is the only thread that opens streams to datanode,
- * and closes them. Any error recovery is also done by this thread.
- */
- @Override
- public void run() {
- long lastPacket = Time.monotonicNow();
- TraceScope scope = null;
- // 死循环知道客户端或者流关闭
- while (!streamerClosed && dfsClient.clientRunning) {
- // if the Responder encountered an error, shutdown Responder
- if (errorState.hasError()) {
- closeResponder();
- }
-
- DFSPacket one;
- try {
- // process datanode IO errors if any
- boolean doSleep = processDatanodeOrExternalError();
-
- final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- long now = Time.monotonicNow();
- /**
- * shouldRun:是否应该停止,根据流是否关闭、是否发生异常、是否客户端停止运行决定
- * dataQueue:最重要的一个变量,== 0表示还未开始写数据
- * stage:block的阶段
- * 现在距离上一个packet是否过去了指定客户端socket(60s)的一半
- * doSleep:数据流突然出现故障
- * 如果这些条件满足了,则让dataQueue休眠等待数据写入
- */
- while ((!shouldStop() && dataQueue.size() == 0 &&
- (stage != BlockConstructionStage.DATA_STREAMING ||
- now - lastPacket < halfSocketTimeout)) || doSleep) {
- long timeout = halfSocketTimeout - (now-lastPacket);
- timeout = timeout <= 0 ? 1000 : timeout;
- timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
- timeout : 1000;
- try {
- dataQueue.wait(timeout);
- } catch (InterruptedException e) {
- LOG.debug("Thread interrupted", e);
- }
- doSleep = false;
- now = Time.monotonicNow();
- }
- if (shouldStop()) {
- continue;
- }
- // get packet to be sent.
- // 获取需要发送的packet
- // 如果数据队列为空,那么先创建一个心跳packet(此心跳用于告知dn客户端仍存活),否则获取正常的数据packet
- if (dataQueue.isEmpty()) {
- one = createHeartbeatPacket();
- } else {
- try {
- // 写入管道拥挤(客户端请求过于频繁)时,会进行一定的休眠
- backOffIfNecessary();
- } catch (InterruptedException e) {
- LOG.debug("Thread interrupted", e);
- }
- one = dataQueue.getFirst(); // regular data packet
- SpanId[] parents = one.getTraceParents();
- if (parents.length > 0) {
- scope = dfsClient.getTracer().
- newScope("dataStreamer", parents[0]);
- scope.getSpan().setParents(parents);
- }
- }
- }
-
- // get new block from namenode.
- LOG.debug("stage={}, {}", stage, this);
-
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- // 此逻辑用于创建新文件
- LOG.debug("Allocating new block: {}", this);
- //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
- // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
- setPipeline(nextBlockOutputStream());
- // 初始化数据流,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
- // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
- // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
- initDataStreaming();
- } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
- // 此逻辑用于往文件添加数据
- LOG.debug("Append to block {}", block);
- // 这里也是创建一个dataStreamer
- setupPipelineForAppendOrRecovery();
- if (streamerClosed) {
- continue;
- }
- // 初始化dataStream,在其中会启动一个ResponseProcessor线程,此线程用来处理来自dn的响应
- // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
- initDataStreaming();
- }
-
- // 获取packet数据在block中的最后偏移量
- long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
- if (lastByteOffsetInBlock > stat.getBlockSize()) {
- throw new IOException("BlockSize " + stat.getBlockSize() +
- " < lastByteOffsetInBlock, " + this + ", " + one);
- }
-
- // 判断是否是最后一个packet
- // 里面会等待所有lastPacket之前的Packet被确认。然后把流水线状态设置为关闭,
- // 但是此时还没有把lastPacket写到流水线上。
- if (one.isLastPacketInBlock()) {
- // wait for all data packets have been successfully acked
- synchronized (dataQueue) {
- while (!shouldStop() && ackQueue.size() != 0) {
- try {
- // wait for acks to arrive from datanodes
- // 等待从dn返回的ack
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
- LOG.debug("Thread interrupted", e);
- }
- }
- }
- if (shouldStop()) {
- continue;
- }
- // 指示pipeline关闭
- stage = BlockConstructionStage.PIPELINE_CLOSE;
- }
-
- // send the packet
- SpanId spanId = SpanId.INVALID;
- synchronized (dataQueue) {
- // move packet from dataQueue to ackQueue
- if (!one.isHeartbeatPacket()) {
- if (scope != null) {
- spanId = scope.getSpanId();
- scope.detach();
- one.setTraceScope(scope);
- }
- scope = null;
- // 将此处理的packet移到ack队列中,指示这些packet处于等待被确认的过程中
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- packetSendTime.put(one.getSeqno(), Time.monotonicNow());
- dataQueue.notifyAll();
- }
- }
-
- LOG.debug("{} sending {}", this, one);
-
- // write out data to remote datanode
- try (TraceScope ignored = dfsClient.getTracer().
- newScope("DataStreamer#writeTo", spanId)) {
- // 将packet写入流水线中
- one.writeTo(blockStream);
- blockStream.flush();
- } catch (IOException e) {
- // HDFS-3398 treat primary DN is down since client is unable to
- // write to primary DN. If a failed or restarting node has already
- // been recorded by the responder, the following call will have no
- // effect. Pipeline recovery can handle only one node error at a
- // time. If the primary node fails again during the recovery, it
- // will be taken out then.
- // 用于标识当没有明显异常收到时,标记第一个dn为挂起而停止传输
- errorState.markFirstNodeIfNotMarked();
- throw e;
- }
- lastPacket = Time.monotonicNow();
-
- // update bytesSent
- long tmpBytesSent = one.getLastByteOffsetBlock();
- if (bytesSent < tmpBytesSent) {
- bytesSent = tmpBytesSent;
- }
-
- if (shouldStop()) {
- continue;
- }
-
- // Is this block full?
- // 通知当前block已经写完,从而等待acks
- if (one.isLastPacketInBlock()) {
- // wait for the close packet has been acked
- synchronized (dataQueue) {
- while (!shouldStop() && ackQueue.size() != 0) {
- dataQueue.wait(1000);// wait for acks to arrive from datanodes
- }
- }
- if (shouldStop()) {
- continue;
- }
-
- // 当一个块写完之后,需要添加新的块,会在上一个块end掉的时候(调用endBlock),
- // 把stage设置成PIPELINE_SETUP_CREATE,这样一来下次流水线也是被建立来创建新的块,达到添加块的目的。
- endBlock();
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && dfsClient.clientRunning) {
- Thread.sleep(artificialSlowdown);
- }
- } catch (Throwable e) {
- // Log warning if there was a real error.
- if (!errorState.isRestartingNode()) {
- // Since their messages are descriptive enough, do not always
- // log a verbose stack-trace WARN for quota exceptions.
- if (e instanceof QuotaExceededException) {
- LOG.debug("DataStreamer Quota Exception", e);
- } else {
- LOG.warn("DataStreamer Exception", e);
- }
- }
- lastException.set(e);
- assert !(e instanceof NullPointerException);
- errorState.setInternalError();
- if (!errorState.isNodeMarked()) {
- // Not a datanode issue
- streamerClosed = true;
- }
- } finally {
- if (scope != null) {
- scope.close();
- scope = null;
- }
- }
- }
- closeInternal();
- }
此方法再创建一个新块时被调用:
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- // 此逻辑用于创建新文件
- LOG.debug("Allocating new block: {}", this);
- //nextBlockOutputStream()方法用来向Namenode 申请块信息,返回LocatedBlock 对象,
- // 其包含了 数据流pipeline 数据流节点信息 DatanodeInfo
- setPipeline(nextBlockOutputStream());
- // 初始化数据流,在其中会启动一个nextBlockOutputStream线程,此线程用来处理来自dn的响应
- // 所谓响应即ack,每当我们发出一个数据Packet,DataNode都需要发送ACK回复我们表示他收到了
- // 因此这样可以看出是每一个block对应一个响应线程,当此block写完关闭时,则会关闭此线程
- initDataStreaming();
- }
这个方法返回的是一个LocatedBlock,包含了一个块的信息。包括Block的备份存储位置,块的大小,块的BGS和BlockId。
- /**
- * Open a DataStreamer to a DataNode so that it can be written to.
- * This happens when a file is created and each time a new block is allocated.
- * Must get block ID and the IDs of the destinations from the namenode.
- * Returns the list of target datanodes.
- */
- protected LocatedBlock nextBlockOutputStream() throws IOException {
- LocatedBlock lb;
- DatanodeInfo[] nodes;
- StorageType[] nextStorageTypes;
- String[] nextStorageIDs;
- int count = dfsClient.getConf().getNumBlockWriteRetry();
- boolean success;
- final ExtendedBlock oldBlock = block.getCurrentBlock();
- // 循环创建一个新块,知道成功或者到达block写入的重试次数
- do {
- // 由于是创建新块,老块的异常就直接清除了
- errorState.resetInternalError();
- lastException.clear();
-
- // 不想将块副本保存到那些dn节点
- DatanodeInfo[] excluded = getExcludedNodes();
- // 创建一个新块,rpc调用namenode的addBlock操作
- lb = locateFollowingBlock(
- excluded.length > 0 ? excluded : null, oldBlock);
- // 设置一些基础信息,如当前块、传输数据量、密钥等
- block.setCurrentBlock(lb.getBlock());
- block.setNumBytes(0);
- bytesSent = 0;
- accessToken = lb.getBlockToken();
- nodes = lb.getLocations();
- nextStorageTypes = lb.getStorageTypes();
- nextStorageIDs = lb.getStorageIDs();
-
- // Connect to first DataNode in the list.
- // 建立和流水线上的第一个dn的连接
- // 这里会先建立一个pipeline的socket连接
- // 而后调用Sender#writeBlock方法通知那些包含在pipeline中的dn
- // 最后接受来自dn的回复,做后续的判断
- success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
- 0L, false);
-
- if (!success) {
- LOG.warn("Abandoning " + block);
- dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
- stat.getFileId(), src, dfsClient.clientName);
- block.setCurrentBlock(null);
- final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
- LOG.warn("Excluding datanode " + badNode);
- excludedNodes.put(badNode, badNode);
- }
- } while (!success && --count >= 0);
-
- if (!success) {
- throw new IOException("Unable to create new block.");
- }
- return lb;
- }
这是一个守护线程,用来处理来自dn的ack。DataNode接收到Packet后需要向客户端回复ACK,表示自己已经收到Packet了,而接收处理ACK的线程类就是ResponseProcessor。
对每一个块的传输都需要新建一个ResponseProcessor,当块传输完,客户端会通过endBlock方法间接地把当前ResponseProcessor销毁掉。下次传输新的Block的时候通过初始化传输环境方法initDataStreaming来间接地创建ResponseProcessor。
启动之后同样主要看run()方法呀:
- @Override
- public void run() {
-
- setName("ResponseProcessor for block " + block);
- // 创建一个代表ack的对象
- PipelineAck ack = new PipelineAck();
-
- TraceScope scope = null;
- // 循环接受ack,除非线程关闭、客户端停止运行、最后一个packet
- while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
- // process responses from datanodes.
- try {
- // read an ack from the pipeline
- // 从管道中读取ack
- ack.readFields(blockReplyStream);
- if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
- Long begin = packetSendTime.get(ack.getSeqno());
- if (begin != null) {
- long duration = Time.monotonicNow() - begin;
- if (duration > dfsclientSlowLogThresholdMs) {
- LOG.info("Slow ReadProcessor read fields for block " + block
- + " took " + duration + "ms (threshold="
- + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
- + ", targets: " + Arrays.asList(targets));
- }
- }
- }
-
- LOG.debug("DFSClient {}", ack);
-
- // 获取packet序号,在客户端和DataNode的通信中,数据是以Packet为单位进行传输的,每个packet的序号独一无二
- // 根据这个序号可以获知此ack对应那个packet
- // 序号是从0开始计数的,序号为-1的Packet是心跳包,客户端用他来告诉DataNode客户端还活着。
- // 序号为-2的包为未知包,收到这个包需要抛出异常
- long seqno = ack.getSeqno();
- // processes response status from datanodes.
- ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
- for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
- // 从ack的header信息中获取对应的dn的状态
- final Status reply = PipelineAck.getStatusFromHeader(ack
- .getHeaderFlag(i));
- // 根据状态查看dn是否处于繁忙
- if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
- PipelineAck.ECN.CONGESTED) {
- congestedNodesFromAck.add(targets[i]);
- }
- // Restart will not be treated differently unless it is
- // the local node or the only one in the pipeline.
- // 根据状态判断是否有dn处于重启过程中
- if (PipelineAck.isRestartOOBStatus(reply)) {
- final String message = "Datanode " + i + " is restarting: "
- + targets[i];
- // 根据是否等待,如果等待将会把将当前传进来的节点标记为正在重启的节点
- // 并且为他设置重启时限,把BadNode记录清除掉(这时的BadNode一般是流水线上第一个DataNode,
- // BadNode指的是工作过程发生错误或者无法联系上的DataNode)
- errorState.initRestartingNode(i, message,
- shouldWaitForRestart(i));
- throw new IOException(message);
- }
- // node error
- // 检查ACK的回应是否是SUCCESS,如果不是,表示对应的DataNode没有
- // 正常接收Packet,那么将把该DataNode标记为BadNode。
- if (reply != SUCCESS) {
- errorState.setBadNodeIndex(i); // mark bad datanode
- throw new IOException("Bad response " + reply +
- " for " + block + " from datanode " + targets[i]);
- }
- }
-
- // 将上面得到的繁忙节点加入到DataStreamer的成员变量congestedNodes中,
- // 这个变量用来标记所有繁忙节点,以便输出日志(DataStreamer的backIfNecessary)的时候观察哪些节点繁忙。
- if (!congestedNodesFromAck.isEmpty()) {
- synchronized (congestedNodes) {
- congestedNodes.clear();
- congestedNodes.addAll(congestedNodesFromAck);
- }
- } else {
- synchronized (congestedNodes) {
- congestedNodes.clear();
- lastCongestionBackoffTime = 0;
- }
- }
-
- assert seqno != PipelineAck.UNKOWN_SEQNO :
- "Ack for unknown seqno should be a failed ack: " + ack;
- if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
- continue;
- }
-
- // a success ack for a data packet
- DFSPacket one;
- // ackQueue中存储的都是待确认的packet,如果数据包发出去之后流水线失败,
- // 得不到确认。数据包可以从ackQueue恢复,不至于以前的Packet丢失。
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- // 收到的ACK的序号和ackQueue队头元素的序号一不一样,如果不一样,说明可能收发乱序了
- if (one.getSeqno() != seqno) {
- throw new IOException("ResponseProcessor: Expecting seqno " +
- one.getSeqno() + " for block " + block +
- " but received " + seqno);
- }
- isLastPacketInBlock = one.isLastPacketInBlock();
-
- // Fail the packet write for testing in order to force a
- // pipeline recovery.
- if (DFSClientFaultInjector.get().failPacket() &&
- isLastPacketInBlock) {
- failPacket = true;
- throw new IOException(
- "Failing the last packet for testing.");
- }
-
- // update bytesAcked
- // getLastByteOffsetBlock其实就是最后一个包的结尾相对Block起始位置的偏移量。也就是现在写了的数据量。
- // offsetInBlock + dataPos - dataStart
- block.setNumBytes(one.getLastByteOffsetBlock());
-
- synchronized (dataQueue) {
- scope = one.getTraceScope();
- if (scope != null) {
- scope.reattach();
- one.setTraceScope(null);
- }
- lastAckedSeqno = seqno;
- pipelineRecoveryCount = 0;
- // 移除已经被确认的packet
- ackQueue.removeFirst();
- packetSendTime.remove(seqno);
- dataQueue.notifyAll();
-
- one.releaseBuffer(byteArrayManager);
- }
- } catch (Throwable e) {
- if (!responderClosed) {
- lastException.set(e);
- errorState.setInternalError();
- // 标记第一个dn为badNode,因为第一个建立连接,嫌疑最大
- errorState.markFirstNodeIfNotMarked();
- synchronized (dataQueue) {
- dataQueue.notifyAll();
- }
- if (!errorState.isRestartingNode()) {
- LOG.warn("Exception for " + block, e);
- }
- responderClosed = true;
- }
- } finally {
- if (scope != null) {
- scope.close();
- }
- scope = null;
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。