赞
踩
之前分析了Spring Cloud Hystrix的创建和超时源码。
现在我们来分析下关于
Hystirx Thread模式,超过设置最大线程数量以及
Hystrix SEMAPHORE 模式,超过最大请求数量源码解析。
继续回到AbstractCommand的applyHystrixSemantics方法。
- private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
- // mark that we're starting execution on the ExecutionHook
- // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
- executionHook.onStart(_cmd);
-
- /* determine if we're allowed to execute */
- if (circuitBreaker.attemptExecution()) {
- final TryableSemaphore executionSemaphore = getExecutionSemaphore();
- final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
- final Action0 singleSemaphoreRelease = new Action0() {
- @Override
- public void call() {
- if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
- executionSemaphore.release();
- }
- }
- };
-
- final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
- @Override
- public void call(Throwable t) {
- eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
- }
- };
-
- if (executionSemaphore.tryAcquire()) {
- try {
- /* used to track userThreadExecutionTime */
- executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
- return executeCommandAndObserve(_cmd)
- .doOnError(markExceptionThrown)
- .doOnTerminate(singleSemaphoreRelease)
- .doOnUnsubscribe(singleSemaphoreRelease);
- } catch (RuntimeException e) {
- return Observable.error(e);
- }
- } else {
- return handleSemaphoreRejectionViaFallback();
- }
- } else {
- return handleShortCircuitViaFallback();
- }
- }
首先我们看if (circuitBreaker.attemptExecution()) 代码。是判断熔断器是否可以执行请求。
- @Override
- public boolean attemptExecution() {
- if (properties.circuitBreakerForceOpen().get()) {
- return false;
- }
- if (properties.circuitBreakerForceClosed().get()) {
- return true;
- }
- if (circuitOpened.get() == -1) {
- return true;
- } else {
- if (isAfterSleepWindow()) {
- if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
- //only the first request after sleep window should execute
- return true;
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
- }
- private boolean isAfterSleepWindow() {
- final long circuitOpenTime = circuitOpened.get();
- final long currentTime = System.currentTimeMillis();
- final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
- return currentTime > circuitOpenTime + sleepWindowTime;
- }
如果熔断器开启,则直接返回false。否则会进行睡眠判断,当我们错误达到一定比例(默认50%)后,会开启熔断并设置一个熔断开启时间,之后根据我们设置的参数(默认是5s),当下次请求进来时,如果请求时间<上次熔断时间+睡眠时间,则直接返回false。
再看这一行final TryableSemaphore executionSemaphore = getExecutionSemaphore();通过getExecutionSemaphore方法获取executionSemaphore 对象。
- protected TryableSemaphore getExecutionSemaphore() {
- if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
- if (executionSemaphoreOverride == null) {
- TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
- if (_s == null) {
- // we didn't find one cache so setup
- executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
- // assign whatever got set (this or another thread)
- return executionSemaphorePerCircuit.get(commandKey.name());
- } else {
- return _s;
- }
- } else {
- return executionSemaphoreOverride;
- }
- } else {
- // return NoOp implementation since we're not using SEMAPHORE isolation
- return TryableSemaphoreNoOp.DEFAULT;
- }
- }
-
- static class TryableSemaphoreActual implements TryableSemaphore {
- protected final HystrixProperty<Integer> numberOfPermits;
- private final AtomicInteger count = new AtomicInteger(0);
-
- public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
- this.numberOfPermits = numberOfPermits;
- }
-
- @Override
- public boolean tryAcquire() {
- int currentCount = count.incrementAndGet();
- if (currentCount > numberOfPermits.get()) {
- count.decrementAndGet();
- return false;
- } else {
- return true;
- }
- }
- }
先判断如果模式是SEMAPHORE ,则创建TryableSemaphoreActual,Thread模式则创建默认的。
继续往下
当请求判断进来后,会进入executionSemaphore.tryAcquire()方法,该方法有两个子类。根据我们设置的Hystrix模式不同,实现类不同。
会进入刚才创建的TryableSemaphoreActual对象的tryAcquire方法。
-
- static class TryableSemaphoreActual implements TryableSemaphore {
- protected final HystrixProperty<Integer> numberOfPermits;
- private final AtomicInteger count = new AtomicInteger(0);
-
- public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
- this.numberOfPermits = numberOfPermits;
- }
-
- @Override
- public boolean tryAcquire() {
- int currentCount = count.incrementAndGet();
- if (currentCount > numberOfPermits.get()) {
- count.decrementAndGet();
- return false;
- } else {
- return true;
- }
- }
- }
该方法很简单,就是每次请求自增一个数,当数量大于我们设置的execution.isolation.semaphore.maxConcurrentRequests时,返回false,达到信号量控制。
该模式对象是TryableSemaphoreNoOp,方法默认返回true。
- static class TryableSemaphoreNoOp implements TryableSemaphore {
-
- public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
-
- @Override
- public boolean tryAcquire() {
- return true;
- }
- }
接着进入executeCommandAndObserve方法。该方法最后会进入到executeCommandWithSpecifiedIsolation方法。这里和之前文章分析的超时模式一致,都会进入到这里。
- private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
- if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
- // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
- return Observable.defer(new Func0<Observable<R>>() {
- @Override
- public Observable<R> call() {
- executionResult = executionResult.setExecutionOccurred();
- if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
- return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
- }
-
- metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
-
- if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
- // the command timed out in the wrapping thread so we will return immediately
- // and not increment any of the counters below or other such logic
- return Observable.error(new RuntimeException("timed out before executing run()"));
- }
- if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
- //we have not been unsubscribed, so should proceed
- HystrixCounters.incrementGlobalConcurrentThreads();
- threadPool.markThreadExecution();
- // store the command that is being run
- endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
- executionResult = executionResult.setExecutedInThread();
- /**
- * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
- */
- try {
- executionHook.onThreadStart(_cmd);
- executionHook.onRunStart(_cmd);
- executionHook.onExecutionStart(_cmd);
- return getUserExecutionObservable(_cmd);
- } catch (Throwable ex) {
- return Observable.error(ex);
- }
- } else {
- //command has already been unsubscribed, so return immediately
- return Observable.error(new RuntimeException("unsubscribed before executing run()"));
- }
- }
- }).doOnTerminate(new Action0() {
- @Override
- public void call() {
- if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
- handleThreadEnd(_cmd);
- }
- if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
- //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
- }
- //if it was unsubscribed, then other cleanup handled it
- }
- }).doOnUnsubscribe(new Action0() {
- @Override
- public void call() {
- if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
- handleThreadEnd(_cmd);
- }
- if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
- //if it was never started and was cancelled, then no need to clean up
- }
- //if it was terminal, then other cleanup handled it
- }
- }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
- @Override
- public Boolean call() {
- return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
- }
- }));
还是和之前一样,创建了RxJava的事件的流,我们看最后这句subscribeOn,表示这些执行是异步线程执行。看一下getScheduler方法。
- public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
- touchConfig();
- return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
- }
-
- private void touchConfig() {
- final int dynamicCoreSize = properties.coreSize().get();
- final int configuredMaximumSize = properties.maximumSize().get();
- int dynamicMaximumSize = properties.actualMaximumSize();
- final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
- boolean maxTooLow = false;
- ..............
- }
再看一下subscribeOn方法。
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- if (this instanceof ScalarSynchronousObservable) {
- return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
- }
- return create(new OperatorSubscribeOn<T>(this, scheduler));
- }
-
- public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
- this.scheduler = scheduler;
- this.source = source;
- }
-
- @Override
- public void call(final Subscriber<? super T> subscriber) {
- final Worker inner = scheduler.createWorker();
- subscriber.add(inner);
-
- inner.schedule(new Action0() {
- @Override
- public void call() {
- final Thread t = Thread.currentThread();
-
- Subscriber<T> s = new Subscriber<T>(subscriber) {
- @Override
- public void onNext(T t) {
- subscriber.onNext(t);
- }
-
- @Override
- public void onError(Throwable e) {
- try {
- subscriber.onError(e);
- } finally {
- inner.unsubscribe();
- }
- }
-
- @Override
- public void onCompleted() {
- try {
- subscriber.onCompleted();
- } finally {
- inner.unsubscribe();
- }
- }
-
- @Override
- public void setProducer(final Producer p) {
- subscriber.setProducer(new Producer() {
- @Override
- public void request(final long n) {
- if (t == Thread.currentThread()) {
- p.request(n);
- } else {
- inner.schedule(new Action0() {
- @Override
- public void call() {
- p.request(n);
- }
- });
- }
- }
- });
- }
- };
-
- source.unsafeSubscribe(s);
- }
- });
- }
最后会执行HystrixContextScheduler类中的内部类ThreadPoolWorker的schedule方法。
- public Subscription schedule(final Action0 action) {
- if (subscription.isUnsubscribed()) {
- // don't schedule, we are unsubscribed
- return Subscriptions.unsubscribed();
- }
-
- // This is internal RxJava API but it is too useful.
- ScheduledAction sa = new ScheduledAction(action);
-
- subscription.add(sa);
- sa.addParent(subscription);
-
- ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
- FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
- sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
-
- return sa;
- }
从方法中我们可以看到,当执行executor.submit时调用的是Java的ThreadPoolExecutor的submit方法,如果超过了设置的最大队列,则直接拒绝。
这里的队列设置在HystrixCommandAspect创建AbstractCommand时确定
- protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
- HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
- HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
- HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
-
- this.commandGroup = initGroupKey(group);
- this.commandKey = initCommandKey(key, getClass());
- this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
- this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
- this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
- this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
- this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
- ....................
- }
- private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
- if (fromConstructor == null) {
- // get the default implementation of HystrixThreadPool
- return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
- } else {
- return fromConstructor;
- }
- }
-
- static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
- // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
- String key = threadPoolKey.name();
-
- // this should find it for all but the first time
- HystrixThreadPool previouslyCached = threadPools.get(key);
- if (previouslyCached != null) {
- return previouslyCached;
- }
-
- // if we get here this is the first time so we need to initialize
- synchronized (HystrixThreadPool.class) {
- if (!threadPools.containsKey(key)) {
- threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
- }
- }
- return threadPools.get(key);
- }
最后会创建HystrixThreadPoolDefault对象。
- public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
- this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
- HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
- this.queueSize = properties.maxQueueSize().get();
-
- this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
- //此处创建线程池
- concurrencyStrategy.getThreadPool(threadPoolKey, properties),
- properties);
- this.threadPool = this.metrics.getThreadPool();
- this.queue = this.threadPool.getQueue();
-
- /* strategy: HystrixMetricsPublisherThreadPool */
- HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
- }
-
- public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
- final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
-
- final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
- final int dynamicCoreSize = threadPoolProperties.coreSize().get();
- final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
- final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
- final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
-
- if (allowMaximumSizeToDivergeFromCoreSize) {
- final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
- if (dynamicCoreSize > dynamicMaximumSize) {
- logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
- dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
- dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
- return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
- } else {
- return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
- }
- } else {
- return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
- }
- }
-
- public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
- if (maxQueueSize <= 0) {
- return new SynchronousQueue<Runnable>();
- } else {
- return new LinkedBlockingQueue<Runnable>(maxQueueSize);
- }
- }
可以看到如果我们不设置maxQueueSize(默认-1)这个参数,是会创建SynchronousQueue对象。
如果设置了最大请求数,则会创建LinkedBlockingQueue队列。
常用设置:
@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "coreSize",value = "2"),
@HystrixProperty(name = "allowMaximumSizeToDivergeFromCoreSize",value="true"),
@HystrixProperty(name = "maximumSize",value="3"),
@HystrixProperty(name = "maxQueueSize",value="2")},
commandProperties = {@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "10000")}
,fallbackMethod = "fallback")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。