当前位置:   article > 正文

Hbase建表、删表慢的实例优化_hadoop创建表太慢

hadoop创建表太慢

一:问题提出

一方面集群小表多,已经形成管理压力,推动用户删表是一个迫切的需求,集群反馈删表和建表缓慢,超时异常等,猜测慢的原因?

  • RPC网络慢?
  • 服务端处理慢?具体是哪个方法慢?
  • 磁盘慢?

二:源码分析

   2.1.创建RPC服务HregionServer初始化方法调用createRpcService

  /**
   * Starts a HRegionServer at the default location
   * @param conf
   * @param csm implementation of CoordinatedStateManager to be used
   * @throws IOException
   * @throws InterruptedException
   */
  public HRegionServer(Configuration conf, CoordinatedStateManager csm)
      throws IOException, InterruptedException {
    this.fsOk = true;
    this.conf = conf;
    checkCodecs(this.conf);
    this.userProvider = UserProvider.instantiate(conf);
    FSUtils.setupShortCircuitRead(this.conf);
    // Disable usage of meta replicas in the regionserver
    this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);

    // Config'ed params
    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);

    this.sleeper = new Sleeper(this.msgInterval, this);

    boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
    this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;

    this.numRegionsToReport = conf.getInt(
      "hbase.regionserver.numregionstoreport", 10);

    this.operationTimeout = conf.getInt(
      HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

    this.shortOperationTimeout = conf.getInt(
      HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
      HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);

    this.abortRequested = false;
    this.stopped = false;

    rpcServices = createRpcServices();
    this.startcode = System.currentTimeMillis();
    if (this instanceof HMaster) {
      useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
    } else {
      useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
    }
    String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
      rpcServices.isa.getHostName();
    serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);

    rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
    rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);

    // login the zookeeper client principal (if using security)
    ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
      HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
    // login the server principal (if using secure Hadoop)
    login(userProvider, hostName);
    // init superusers and add the server principal (if using security)
    // or process owner as default super user.
    Superusers.initialize(conf);

    regionServerAccounting = new RegionServerAccounting();

    cacheConfig = new CacheConfig(conf);
    mobCacheConfig = new MobCacheConfig(conf);

    uncaughtExceptionHandler = new UncaughtExceptionHandler() {
      @Override
      public void uncaughtException(Thread t, Throwable e) {
        abort("Uncaught exception in service thread " + t.getName(), e);
      }
    };

    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);

    initializeFileSystem();

    service = new ExecutorService(getServerName().toShortString());
    spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());

    // Some unit tests don't need a cluster, so no zookeeper at all
    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
      // Open connection to zookeeper and set primary watcher
      zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
        rpcServices.isa.getPort(), this, canCreateBaseZNode());

      this.csm = (BaseCoordinatedStateManager) csm;
      this.csm.initialize(this);
      this.csm.start();

      tableLockManager = TableLockManager.createTableLockManager(
        conf, zooKeeper, serverName);

      masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
      masterAddressTracker.start();

      clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
      clusterStatusTracker.start();
    }
    this.configurationManager = new ConfigurationManager();

    rpcServices.start();
    putUpWebUI();
    this.walRoller = new LogRoller(this, this);
    this.choreService = new ChoreService(getServerName().toString(), true);

    if (!SystemUtils.IS_OS_WINDOWS) {
      Signal.handle(new Signal("HUP"), new SignalHandler() {
        @Override
        public void handle(Signal signal) {
          getConfiguration().reloadConfiguration();
          configurationManager.notifyAllObservers(getConfiguration());
        }
      });
    }
  }

2.2处理单次RPC请求RPCServer.processOneRPC

    private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
      if (connectionHeaderRead) {
        processRequest(buf);
      } else {
        processConnectionHeader(buf);
        this.connectionHeaderRead = true;
        if (!authorizeConnection()) {
          // Throw FatalConnectionException wrapping ACE so client does right thing and closes
          // down the connection instead of trying to read non-existent retun.
          throw new AccessDeniedException("Connection from " + this + " for service " +
            connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
        }
        this.user = userProvider.create(this.ugi);
      }
    }

2.3.反序列化生成处理请求放入队列RPCServer.processRequest

  /**
     * @param buf Has the request header and the request param and optionally encoded data buffer
     * all in this one array.
     * @throws IOException
     * @throws InterruptedException
     */
    protected void processRequest(byte[] buf) throws IOException, InterruptedException {
      long totalRequestSize = buf.length;
      int offset = 0;
      // Here we read in the header.  We avoid having pb
      // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
      CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
      int headerSize = cis.readRawVarint32();
      offset = cis.getTotalBytesRead();
      Message.Builder builder = RequestHeader.newBuilder();
      ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
      RequestHeader header = (RequestHeader) builder.build();
      offset += headerSize;
      int id = header.getCallId();
      if (LOG.isTraceEnabled()) {
        LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
          " totalRequestSize: " + totalRequestSize + " bytes");
      }
      // Enforcing the call queue size, this triggers a retry in the client
      // This is a bit late to be doing this check - we have already read in the total request.
      if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
        final Call callTooBig =
          new Call(id, this.service, null, null, null, null, this,
            responder, totalRequestSize, null, null);
        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
        InetSocketAddress address = getListenerAddress();
        setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
            "Call queue is full on " + (address != null ? address : "(channel closed)") +
                ", is hbase.ipc.server.max.callqueue.size too small?");
        responder.doRespond(callTooBig);
        return;
      }
      MethodDescriptor md = null;
      Message param = null;
      CellScanner cellScanner = null;
      try {
        if (header.hasRequestParam() && header.getRequestParam()) {
          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
          if (md == null) throw new UnsupportedOperationException(header.getMethodName());
          builder = this.service.getRequestPrototype(md).newBuilderForType();
          // To read the varint, I need an inputstream; might as well be a CIS.
          cis = CodedInputStream.newInstance(buf, offset, buf.length);
          int paramSize = cis.readRawVarint32();
          offset += cis.getTotalBytesRead();
          if (builder != null) {
            ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
            param = builder.build();
          }
          offset += paramSize;
        }
        if (header.hasCellBlockMeta()) {
          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
            buf, offset, buf.length);
        }
      } catch (Throwable t) {
        InetSocketAddress address = getListenerAddress();
        String msg = (address != null ? address : "(channel closed)") +
            " is unable to read call parameter from client " + getHostAddress();
        LOG.warn(msg, t);

        metrics.exception(t);

        // probably the hbase hadoop version does not match the running hadoop version
        if (t instanceof LinkageError) {
          t = new DoNotRetryIOException(t);
        }
        // If the method is not present on the server, do not retry.
        if (t instanceof UnsupportedOperationException) {
          t = new DoNotRetryIOException(t);
        }

        final Call readParamsFailedCall =
          new Call(id, this.service, null, null, null, null, this,
            responder, totalRequestSize, null, null);
        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
        setupResponse(responseBuffer, readParamsFailedCall, t,
          msg + "; " + t.getMessage());
        responder.doRespond(readParamsFailedCall);
        return;
      }

      TraceInfo traceInfo = header.hasTraceInfo()
          ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
          : null;
      Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
              totalRequestSize, traceInfo, this.addr);

      if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
        callQueueSize.add(-1 * call.getSize());

        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
        InetSocketAddress address = getListenerAddress();
        setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
            "Call queue is full on " + (address != null ? address : "(channel closed)") +
                ", too many items queued ?");
        responder.doRespond(call);
      }
    }

2.4:删除表和建表CallRunner.run的diableTable、deleteTable、createTable的入口是:this.rpcServer.call

  public void run() {
    try {
      if (!call.connection.channel.isOpen()) {
        if (RpcServer.LOG.isDebugEnabled()) {
          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
        }
        return;
      }
      this.status.setStatus("Setting up call");
      this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
      if (RpcServer.LOG.isTraceEnabled()) {
        UserGroupInformation remoteUser = call.connection.ugi;
        RpcServer.LOG.trace(call.toShortString() + " executing as " +
            ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
      }
      Throwable errorThrowable = null;
      String error = null;
      Pair<Message, CellScanner> resultPair = null;
      RpcServer.CurCall.set(call);
      TraceScope traceScope = null;
      try {
        if (!this.rpcServer.isStarted()) {
          InetSocketAddress address = rpcServer.getListenerAddress();
          throw new ServerNotRunningYetException("Server " +
              (address != null ? address : "(channel closed)") + " is not running yet");
        }
        if (call.tinfo != null) {
          traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
        }
        // make the call
        resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
          call.timestamp, this.status);
      } catch (Throwable e) {
        RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
        errorThrowable = e;
        error = StringUtils.stringifyException(e);
        if (e instanceof Error) {
          throw (Error)e;
        }
      } finally {
        if (traceScope != null) {
          traceScope.close();
        }
        RpcServer.CurCall.set(null);
        if (resultPair != null) {
          this.rpcServer.addCallSize(call.getSize() * -1);
          sucessful = true;
        }
      }
      // Set the response
      Message param = resultPair != null ? resultPair.getFirst() : null;
      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
      call.setResponse(param, cells, errorThrowable, error);
      call.sendResponseIfReady();
      this.status.markComplete("Sent response");
      this.status.pause("Waiting for a call");
    } catch (OutOfMemoryError e) {
      if (this.rpcServer.getErrorHandler() != null) {
        if (this.rpcServer.getErrorHandler().checkOOME(e)) {
          RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
          return;
        }
      } else {
        // rethrow if no handler
        throw e;
      }
    } catch (ClosedChannelException cce) {
      InetSocketAddress address = rpcServer.getListenerAddress();
      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
          "this means that the server " + (address != null ? address : "(channel closed)") +
          " was processing a request but the client went away. The error message was: " +
          cce.getMessage());
    } catch (Exception e) {
      RpcServer.LOG.warn(Thread.currentThread().getName()
          + ": caught: " + StringUtils.stringifyException(e));
    } finally {
      if (!sucessful) {
        this.rpcServer.addCallSize(call.getSize() * -1);
      }
      cleanup();
    }
  }

2.5:HMaster实现MasterServices提供deleteTable和createTable服务:

  @Override
  public long deleteTable(
      final TableName tableName,
      final long nonceGroup,
      final long nonce) throws IOException {
    checkInitialized();
    if (cpHost != null) {
      cpHost.preDeleteTable(tableName);
    }
    LOG.info(getClientIdAuditPrefix() + " delete " + tableName);

    // TODO: We can handle/merge duplicate request
    ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
    long procId = this.procedureExecutor.submitProcedure(
      new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
      nonceGroup,
      nonce);
    latch.await();

    if (cpHost != null) {
      cpHost.postDeleteTable(tableName);
    }

    return procId;
  }

三:动态跟踪删表和建表函数耗时

    如何不影响运行,同时可以定位到哪个方法用时?优先想到动态追踪技术DTrace,DTrace不适用与linux,决定采用arthas的动态追踪功能

3.1安装arthas

          curl -L https://arthas.aliyun.com/install.sh | sh

3.2动态追踪建表和删表

         sudo -u hbase -EH ./as.sh 61890

         trace -n 1000 org.apache.hadoop.hbase.master.HMaster disableTable >> /tmp/diable-t &

         trace -n 1000 org.apache.hadoop.hbase.master.HMaster deleteTable >> /tmp/delete-t &

         trace -n 1000 org.apache.hadoop.hbase.master.HMaster createTable >> /tmp/create-t &

 3.3追踪日志:

`---ts=2021-05-31 19:31:14;thread_name=RpcServer.FifoWFPBQ.default.handler=115,queue=11,port=60000;id=ab;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
+---[0.009442ms] org.apache.hadoop.hbase.master.HMaster:checkInitialized() #2063
+---[0.010083ms] org.apache.hadoop.hbase.master.HMaster:getClientIdAuditPrefix() #2067
+---[0.027572ms] org.apache.commons.logging.Log:info() #95
+---[0.007897ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:createLatch() #2070
+---[0.003904ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:getEnvironment() #2072
+---[170.389842ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure() #95
+---[0.004545ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:await() #2080
`---[336.4205ms] org.apache.hadoop.hbase.master.HMaster:deleteTable()
+---[0.0139ms] org.apache.hadoop.hbase.master.HMaster:checkInitialized() #1902
+---[0.02131ms] org.apache.hadoop.hbase.master.MasterCoprocessorHost:preDeleteTable() #1904
+---[0.025369ms] org.apache.hadoop.hbase.master.HMaster:getClientIdAuditPrefix() #1906
+---[0.049835ms] org.apache.commons.logging.Log:info() #95
+---[0.017062ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:createLatch() #1909
+---[0.006918ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:getEnvironment() #1910
+---[0.026239ms] org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure:<init>() #95
+---[163.381152ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure() #95
+---[0.007799ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:await() #1914
`---[33.345892ms] org.apache.hadoop.hbase.master.MasterCoprocessorHost:postDeleteTable() #1917

`---ts=2021-05-31 19:35:44;thread_name=RpcServer.FifoWFPBQ.default.handler=89,queue=11,port=60000;id=91;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
+---[0.013285ms] org.apache.hadoop.hbase.master.HMaster:checkInitialized() #2063
+---[0.02621ms] org.apache.hadoop.hbase.master.HMaster:getClientIdAuditPrefix() #2067
+---[0.077883ms] org.apache.commons.logging.Log:info() #95
+---[0.019404ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:createLatch() #2070
+---[0.007435ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:getEnvironment() #2072
+---[177.803304ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure() #95
+---[0.008626ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:await() #2080
`---[324.734257ms] org.apache.hadoop.hbase.master.HMaster:deleteTable()
+---[0.015295ms] org.apache.hadoop.hbase.master.HMaster:checkInitialized() #1902
+---[0.02818ms] org.apache.hadoop.hbase.master.MasterCoprocessorHost:preDeleteTable() #1904
+---[0.025206ms] org.apache.hadoop.hbase.master.HMaster:getClientIdAuditPrefix() #1906
+---[0.043664ms] org.apache.commons.logging.Log:info() #95
+---[0.019983ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:createLatch() #1909
+---[0.005777ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:getEnvironment() #1910
+---[0.018761ms] org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure:<init>() #95
+---[156.329896ms] org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure() #95
+---[0.018028ms] org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch:await() #1914
`---[32.321164ms] org.apache.hadoop.hbase.master.MasterCoprocessorHost:postDeleteTable() #1917

四:追踪结果分析及优化

4.1 定位慢代码

我们的删除表和建表都分表追踪了一天,由追踪结果可知 org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure()的处理时间最长,代码行数95,由此我们据需深入ProcedureExecutor类和方法,发现此方法是由一个线程池单独处理:

  private void startProcedureExecutor() throws IOException {
    final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
    final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
        MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);

    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
        new MasterProcedureEnv.WALStoreLeaseRecovery(this));
    procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
    procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
        procEnv.getProcedureQueue());

    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
        Math.max(Runtime.getRuntime().availableProcessors(),
          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
    final boolean abortOnCorruption = conf.getBoolean(
        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
    procedureStore.start(numThreads);
    procedureExecutor.start(numThreads, abortOnCorruption);
  }

/** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;


4.2 参数调优

由此发现一个hbase集群都没有使用hbase.master.procedure.threads参数配置,调整集群参数至64

            

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/829877
推荐阅读
相关标签
  

闽ICP备14008679号