赞
踩
一方面集群小表多,已经形成管理压力,推动用户删表是一个迫切的需求,集群反馈删表和建表缓慢,超时异常等,猜测慢的原因?
/** * Starts a HRegionServer at the default location * @param conf * @param csm implementation of CoordinatedStateManager to be used * @throws IOException * @throws InterruptedException */ public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException, InterruptedException { this.fsOk = true; this.conf = conf; checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); this.sleeper = new Sleeper(this.msgInterval, this); boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; this.numRegionsToReport = conf.getInt( "hbase.regionserver.numregionstoreport", 10); this.operationTimeout = conf.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.shortOperationTimeout = conf.getInt( HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); this.abortRequested = false; this.stopped = false; rpcServices = createRpcServices(); this.startcode = System.currentTimeMillis(); if (this instanceof HMaster) { useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY); } else { useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY); } String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead : rpcServices.isa.getHostName(); serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); // login the zookeeper client principal (if using security) ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); // login the server principal (if using secure Hadoop) login(userProvider, hostName); // init superusers and add the server principal (if using security) // or process owner as default super user. Superusers.initialize(conf); regionServerAccounting = new RegionServerAccounting(); cacheConfig = new CacheConfig(conf); mobCacheConfig = new MobCacheConfig(conf); uncaughtExceptionHandler = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { abort("Uncaught exception in service thread " + t.getName(), e); } }; useZKForAssignment = ConfigUtil.useZKForAssignment(conf); initializeFileSystem(); service = new ExecutorService(getServerName().toShortString()); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, canCreateBaseZNode()); this.csm = (BaseCoordinatedStateManager) csm; this.csm.initialize(this); this.csm.start(); tableLockManager = TableLockManager.createTableLockManager( conf, zooKeeper, serverName); masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); clusterStatusTracker.start(); } this.configurationManager = new ConfigurationManager(); rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); this.choreService = new ChoreService(getServerName().toString(), true); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @Override public void handle(Signal signal) { getConfiguration().reloadConfiguration(); configurationManager.notifyAllObservers(getConfiguration()); } }); } } |
private void processOneRpc(byte[] buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { processConnectionHeader(buf); this.connectionHeaderRead = true; if (!authorizeConnection()) { // Throw FatalConnectionException wrapping ACE so client does right thing and closes // down the connection instead of trying to read non-existent retun. throw new AccessDeniedException("Connection from " + this + " for service " + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); } this.user = userProvider.create(this.ugi); } } |
/** * @param buf Has the request header and the request param and optionally encoded data buffer * all in this one array. * @throws IOException * @throws InterruptedException */ protected void processRequest(byte[] buf) throws IOException, InterruptedException { long totalRequestSize = buf.length; int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); if (LOG.isTraceEnabled()) { LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + " totalRequestSize: " + totalRequestSize + " bytes"); } // Enforcing the call queue size, this triggers a retry in the client // This is a bit late to be doing this check - we have already read in the total request. if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, "Call queue is full on " + (address != null ? address : "(channel closed)") + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; } MethodDescriptor md = null; Message param = null; CellScanner cellScanner = null; try { if (header.hasRequestParam() && header.getRequestParam()) { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); // To read the varint, I need an inputstream; might as well be a CIS. cis = CodedInputStream.newInstance(buf, offset, buf.length); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf, offset, buf.length); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); String msg = (address != null ? address : "(channel closed)") + " is unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); metrics.exception(t); // probably the hbase hadoop version does not match the running hadoop version if (t instanceof LinkageError) { t = new DoNotRetryIOException(t); } // If the method is not present on the server, do not retry. if (t instanceof UnsupportedOperationException) { t = new DoNotRetryIOException(t); } final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); responder.doRespond(readParamsFailedCall); return; } TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { callQueueSize.add(-1 * call.getSize()); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, "Call queue is full on " + (address != null ? address : "(channel closed)") + ", too many items queued ?"); responder.doRespond(call); } } |
public void run() { try { if (!call.connection.channel.isOpen()) { if (RpcServer.LOG.isDebugEnabled()) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); } return; } this.status.setStatus("Setting up call"); this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { UserGroupInformation remoteUser = call.connection.ugi; RpcServer.LOG.trace(call.toShortString() + " executing as " + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); } Throwable errorThrowable = null; String error = null; Pair<Message, CellScanner> resultPair = null; RpcServer.CurCall.set(call); TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); throw new ServerNotRunningYetException("Server " + (address != null ? address : "(channel closed)") + " is not running yet"); } if (call.tinfo != null) { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, call.timestamp, this.status); } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; error = StringUtils.stringifyException(e); if (e instanceof Error) { throw (Error)e; } } finally { if (traceScope != null) { traceScope.close(); } RpcServer.CurCall.set(null); if (resultPair != null) { this.rpcServer.addCallSize(call.getSize() * -1); sucessful = true; } } // Set the response Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; call.setResponse(param, cells, errorThrowable, error); call.sendResponseIfReady(); this.status.markComplete("Sent response"); this.status.pause("Waiting for a call"); } catch (OutOfMemoryError e) { if (this.rpcServer.getErrorHandler() != null) { if (this.rpcServer.getErrorHandler().checkOOME(e)) { RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); return; } } else { // rethrow if no handler throw e; } } catch (ClosedChannelException cce) { InetSocketAddress address = rpcServer.getListenerAddress(); RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + "this means that the server " + (address != null ? address : "(channel closed)") + " was processing a request but the client went away. The error message was: " + cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught: " + StringUtils.stringifyException(e)); } finally { if (!sucessful) { this.rpcServer.addCallSize(call.getSize() * -1); } cleanup(); } } |
@Override public long deleteTable( final TableName tableName, final long nonceGroup, final long nonce) throws IOException { checkInitialized(); if (cpHost != null) { cpHost.preDeleteTable(tableName); } LOG.info(getClientIdAuditPrefix() + " delete " + tableName); // TODO: We can handle/merge duplicate request ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); long procId = this.procedureExecutor.submitProcedure( new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch), nonceGroup, nonce); latch.await(); if (cpHost != null) { cpHost.postDeleteTable(tableName); } return procId; } |
如何不影响运行,同时可以定位到哪个方法用时?优先想到动态追踪技术DTrace,DTrace不适用与linux,决定采用arthas的动态追踪功能
curl -L https://arthas.aliyun.com/install.sh | sh
sudo -u hbase -EH ./as.sh 61890
trace -n 1000 org.apache.hadoop.hbase.master.HMaster disableTable >> /tmp/diable-t &
trace -n 1000 org.apache.hadoop.hbase.master.HMaster deleteTable >> /tmp/delete-t &
trace -n 1000 org.apache.hadoop.hbase.master.HMaster createTable >> /tmp/create-t &
`---ts=2021-05-31 19:31:14;thread_name=RpcServer.FifoWFPBQ.default.handler=115,queue=11,port=60000;id=ab;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6 `---ts=2021-05-31 19:35:44;thread_name=RpcServer.FifoWFPBQ.default.handler=89,queue=11,port=60000;id=91;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6 |
我们的删除表和建表都分表追踪了一天,由追踪结果可知 org.apache.hadoop.hbase.procedure2.ProcedureExecutor:submitProcedure()的处理时间最长,代码行数95,由此我们据需深入ProcedureExecutor类和方法,发现此方法是由一个线程池单独处理:
private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, procEnv.getProcedureQueue()); final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(Runtime.getRuntime().availableProcessors(), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); final boolean abortOnCorruption = conf.getBoolean( MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); procedureExecutor.start(numThreads, abortOnCorruption); } |
/** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
由此发现一个hbase集群都没有使用hbase.master.procedure.threads参数配置,调整集群参数至64
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。