赞
踩
熔断器状态变化:
当hystrix线程池队列开始堆积的时候,command的耗时时间并未计算队列中等待的时间,hystrix判断一个command是否超时,是在commad执行时加入到一个定时任务中做的判断,源码在如下HystrixObservableTimeoutOperator定时执行的:
Observable execution;
if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new AbstractCommand.HystrixObservableTimeoutOperator(_cmd));
} else {
execution = this.executeCommandWithSpecifiedIsolation(_cmd);
}
线程池配置有如下几个相关配置,当已堆积线程池队列长度>queueSizeRejectionThreshold且queueSizeRejectionThreshold<maxQueueSize,这是由于线程池的队列未满,所以线程池不会使用maximumSize的线程扩容,可看到如下代码线程池调度时候的判断,会返回false,如果需要扩容的话,需要设置queueSizeRejectionThreshold>maxQueueSize
hystrix.threadpool.default.coreSize
hystrix.threadpool.default.maximumSize
hystrix.threadpool.default.maxQueueSize
hystrix.threadpool.default.queueSizeRejectionThreshold
public boolean isQueueSpaceAvailable() {
if (this.queueSize <= 0) {
return true;
} else {
return this.threadPool.getQueue().size() < (Integer)this.properties.queueSizeRejectionThreshold().get();
}
}
5.1.12版本有个bug,当OPEN->HALF_OPEN,放一个请求执行时,如果这个请求被取消订阅(unsubscribe),这时候状态没有更新,还是HALF_OPEN,所以后面请求进来判断是否熔断,永远是false,熔断器永远不会close,判断熔断源码如下:
public boolean attemptExecution() {
if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
return false;
} else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
return true;
} else if (this.circuitOpened.get() == -1L) {
return true;
} else if (this.isAfterSleepWindow()) {
return this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN);
} else {
return false;
}
}
unsubscribe触发的条件比较隐蔽还没找到,可以手动调用api复现此bug,如下复现的代码,升级版本后bug已修复:
public void startDemo() { int i=0; runSimulatedRequestOnThread(i++); } public void runSimulatedRequestOnThread(int i) { CountDownLatch countDownLatch=new CountDownLatch(1); CountDownLatch countDownLatch1=new CountDownLatch(1); pool.execute(new Runnable() { @Override public void run() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { //触发熔断 for (int j = 0; j < 10; j++) { HystrixCommand<PaymentInformation> faile1 = new GetPaymentInformationCommand(0); faile1.execute(); System.out.println("Request1 => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); } Thread.sleep(400); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } finally { context.shutdown(); } } }); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } pool.execute(new Runnable() { @Override public void run() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { //模拟取消订阅 HystrixCommand<PaymentInformation> success = new GetPaymentInformationCommand(1); // success.execute(); // System.out.println("Request2 => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); Observable<PaymentInformation> observe = success.observe(); Subscription s = observe.subscribe(); s.unsubscribe(); System.out.println("Request2 => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); Thread.sleep(3000); countDownLatch1.countDown(); } catch (Exception e) { e.printStackTrace(); } finally { context.shutdown(); } } }); try { countDownLatch1.await(); } catch (InterruptedException e) { e.printStackTrace(); } pool.execute(new Runnable() { @Override public void run() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { for (int i=0;i<100;i++){ PaymentInformation paymentInformation = new GetPaymentInformationCommand(2).execute(); System.out.println("Request3 => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); } } catch (Exception e) { e.printStackTrace(); } finally { context.shutdown(); } } }); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。