当前位置:   article > 正文

Spring Cloud Hystrix 源码解析(超过最大线程数、最大请求数)_hystirx动态修改线程数源码

hystirx动态修改线程数源码

之前分析了Spring Cloud Hystrix的创建和超时源码。

现在我们来分析下关于

Hystirx Thread模式,超过设置最大线程数量以及

Hystrix SEMAPHORE 模式,超过最大请求数量源码解析。

继续回到AbstractCommand的applyHystrixSemantics方法。

  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
  2. // mark that we're starting execution on the ExecutionHook
  3. // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
  4. executionHook.onStart(_cmd);
  5. /* determine if we're allowed to execute */
  6. if (circuitBreaker.attemptExecution()) {
  7. final TryableSemaphore executionSemaphore = getExecutionSemaphore();
  8. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  9. final Action0 singleSemaphoreRelease = new Action0() {
  10. @Override
  11. public void call() {
  12. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  13. executionSemaphore.release();
  14. }
  15. }
  16. };
  17. final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  18. @Override
  19. public void call(Throwable t) {
  20. eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
  21. }
  22. };
  23. if (executionSemaphore.tryAcquire()) {
  24. try {
  25. /* used to track userThreadExecutionTime */
  26. executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
  27. return executeCommandAndObserve(_cmd)
  28. .doOnError(markExceptionThrown)
  29. .doOnTerminate(singleSemaphoreRelease)
  30. .doOnUnsubscribe(singleSemaphoreRelease);
  31. } catch (RuntimeException e) {
  32. return Observable.error(e);
  33. }
  34. } else {
  35. return handleSemaphoreRejectionViaFallback();
  36. }
  37. } else {
  38. return handleShortCircuitViaFallback();
  39. }
  40. }

首先我们看if (circuitBreaker.attemptExecution()) 代码。是判断熔断器是否可以执行请求。

  1. @Override
  2. public boolean attemptExecution() {
  3. if (properties.circuitBreakerForceOpen().get()) {
  4. return false;
  5. }
  6. if (properties.circuitBreakerForceClosed().get()) {
  7. return true;
  8. }
  9. if (circuitOpened.get() == -1) {
  10. return true;
  11. } else {
  12. if (isAfterSleepWindow()) {
  13. if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
  14. //only the first request after sleep window should execute
  15. return true;
  16. } else {
  17. return false;
  18. }
  19. } else {
  20. return false;
  21. }
  22. }
  23. }
  24. private boolean isAfterSleepWindow() {
  25. final long circuitOpenTime = circuitOpened.get();
  26. final long currentTime = System.currentTimeMillis();
  27. final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
  28. return currentTime > circuitOpenTime + sleepWindowTime;
  29. }

如果熔断器开启,则直接返回false。否则会进行睡眠判断,当我们错误达到一定比例(默认50%)后,会开启熔断并设置一个熔断开启时间,之后根据我们设置的参数(默认是5s),当下次请求进来时,如果请求时间<上次熔断时间+睡眠时间,则直接返回false。

再看这一行final TryableSemaphore executionSemaphore = getExecutionSemaphore();通过getExecutionSemaphore方法获取executionSemaphore 对象。

  1. protected TryableSemaphore getExecutionSemaphore() {
  2. if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
  3. if (executionSemaphoreOverride == null) {
  4. TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
  5. if (_s == null) {
  6. // we didn't find one cache so setup
  7. executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
  8. // assign whatever got set (this or another thread)
  9. return executionSemaphorePerCircuit.get(commandKey.name());
  10. } else {
  11. return _s;
  12. }
  13. } else {
  14. return executionSemaphoreOverride;
  15. }
  16. } else {
  17. // return NoOp implementation since we're not using SEMAPHORE isolation
  18. return TryableSemaphoreNoOp.DEFAULT;
  19. }
  20. }
  21. static class TryableSemaphoreActual implements TryableSemaphore {
  22. protected final HystrixProperty<Integer> numberOfPermits;
  23. private final AtomicInteger count = new AtomicInteger(0);
  24. public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
  25. this.numberOfPermits = numberOfPermits;
  26. }
  27. @Override
  28. public boolean tryAcquire() {
  29. int currentCount = count.incrementAndGet();
  30. if (currentCount > numberOfPermits.get()) {
  31. count.decrementAndGet();
  32. return false;
  33. } else {
  34. return true;
  35. }
  36. }
  37. }

先判断如果模式是SEMAPHORE ,则创建TryableSemaphoreActual,Thread模式则创建默认的。

继续往下

当请求判断进来后,会进入executionSemaphore.tryAcquire()方法,该方法有两个子类。根据我们设置的Hystrix模式不同,实现类不同。

Hystrix SEMAPHORE 模式

会进入刚才创建的TryableSemaphoreActual对象的tryAcquire方法。

  1. static class TryableSemaphoreActual implements TryableSemaphore {
  2. protected final HystrixProperty<Integer> numberOfPermits;
  3. private final AtomicInteger count = new AtomicInteger(0);
  4. public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
  5. this.numberOfPermits = numberOfPermits;
  6. }
  7. @Override
  8. public boolean tryAcquire() {
  9. int currentCount = count.incrementAndGet();
  10. if (currentCount > numberOfPermits.get()) {
  11. count.decrementAndGet();
  12. return false;
  13. } else {
  14. return true;
  15. }
  16. }
  17. }

该方法很简单,就是每次请求自增一个数,当数量大于我们设置的execution.isolation.semaphore.maxConcurrentRequests时,返回false,达到信号量控制。

Hystirx Thread模式

该模式对象是TryableSemaphoreNoOp,方法默认返回true。

  1. static class TryableSemaphoreNoOp implements TryableSemaphore {
  2. public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
  3. @Override
  4. public boolean tryAcquire() {
  5. return true;
  6. }
  7. }

接着进入executeCommandAndObserve方法。该方法最后会进入到executeCommandWithSpecifiedIsolation方法。这里和之前文章分析的超时模式一致,都会进入到这里。

  1. private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
  2. if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
  3. // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
  4. return Observable.defer(new Func0<Observable<R>>() {
  5. @Override
  6. public Observable<R> call() {
  7. executionResult = executionResult.setExecutionOccurred();
  8. if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
  9. return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
  10. }
  11. metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
  12. if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
  13. // the command timed out in the wrapping thread so we will return immediately
  14. // and not increment any of the counters below or other such logic
  15. return Observable.error(new RuntimeException("timed out before executing run()"));
  16. }
  17. if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
  18. //we have not been unsubscribed, so should proceed
  19. HystrixCounters.incrementGlobalConcurrentThreads();
  20. threadPool.markThreadExecution();
  21. // store the command that is being run
  22. endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
  23. executionResult = executionResult.setExecutedInThread();
  24. /**
  25. * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
  26. */
  27. try {
  28. executionHook.onThreadStart(_cmd);
  29. executionHook.onRunStart(_cmd);
  30. executionHook.onExecutionStart(_cmd);
  31. return getUserExecutionObservable(_cmd);
  32. } catch (Throwable ex) {
  33. return Observable.error(ex);
  34. }
  35. } else {
  36. //command has already been unsubscribed, so return immediately
  37. return Observable.error(new RuntimeException("unsubscribed before executing run()"));
  38. }
  39. }
  40. }).doOnTerminate(new Action0() {
  41. @Override
  42. public void call() {
  43. if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
  44. handleThreadEnd(_cmd);
  45. }
  46. if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
  47. //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
  48. }
  49. //if it was unsubscribed, then other cleanup handled it
  50. }
  51. }).doOnUnsubscribe(new Action0() {
  52. @Override
  53. public void call() {
  54. if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
  55. handleThreadEnd(_cmd);
  56. }
  57. if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
  58. //if it was never started and was cancelled, then no need to clean up
  59. }
  60. //if it was terminal, then other cleanup handled it
  61. }
  62. }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
  63. @Override
  64. public Boolean call() {
  65. return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
  66. }
  67. }));

还是和之前一样,创建了RxJava的事件的流,我们看最后这句subscribeOn,表示这些执行是异步线程执行。看一下getScheduler方法。

  1. public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
  2. touchConfig();
  3. return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
  4. }
  5. private void touchConfig() {
  6. final int dynamicCoreSize = properties.coreSize().get();
  7. final int configuredMaximumSize = properties.maximumSize().get();
  8. int dynamicMaximumSize = properties.actualMaximumSize();
  9. final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
  10. boolean maxTooLow = false;
  11. ..............
  12. }

再看一下subscribeOn方法。

  1. public final Observable<T> subscribeOn(Scheduler scheduler) {
  2. if (this instanceof ScalarSynchronousObservable) {
  3. return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
  4. }
  5. return create(new OperatorSubscribeOn<T>(this, scheduler));
  6. }
  7. public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
  8. this.scheduler = scheduler;
  9. this.source = source;
  10. }
  11. @Override
  12. public void call(final Subscriber<? super T> subscriber) {
  13. final Worker inner = scheduler.createWorker();
  14. subscriber.add(inner);
  15. inner.schedule(new Action0() {
  16. @Override
  17. public void call() {
  18. final Thread t = Thread.currentThread();
  19. Subscriber<T> s = new Subscriber<T>(subscriber) {
  20. @Override
  21. public void onNext(T t) {
  22. subscriber.onNext(t);
  23. }
  24. @Override
  25. public void onError(Throwable e) {
  26. try {
  27. subscriber.onError(e);
  28. } finally {
  29. inner.unsubscribe();
  30. }
  31. }
  32. @Override
  33. public void onCompleted() {
  34. try {
  35. subscriber.onCompleted();
  36. } finally {
  37. inner.unsubscribe();
  38. }
  39. }
  40. @Override
  41. public void setProducer(final Producer p) {
  42. subscriber.setProducer(new Producer() {
  43. @Override
  44. public void request(final long n) {
  45. if (t == Thread.currentThread()) {
  46. p.request(n);
  47. } else {
  48. inner.schedule(new Action0() {
  49. @Override
  50. public void call() {
  51. p.request(n);
  52. }
  53. });
  54. }
  55. }
  56. });
  57. }
  58. };
  59. source.unsafeSubscribe(s);
  60. }
  61. });
  62. }

最后会执行HystrixContextScheduler类中的内部类ThreadPoolWorker的schedule方法。

  1. public Subscription schedule(final Action0 action) {
  2. if (subscription.isUnsubscribed()) {
  3. // don't schedule, we are unsubscribed
  4. return Subscriptions.unsubscribed();
  5. }
  6. // This is internal RxJava API but it is too useful.
  7. ScheduledAction sa = new ScheduledAction(action);
  8. subscription.add(sa);
  9. sa.addParent(subscription);
  10. ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
  11. FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
  12. sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
  13. return sa;
  14. }

从方法中我们可以看到,当执行executor.submit时调用的是Java的ThreadPoolExecutor的submit方法,如果超过了设置的最大队列,则直接拒绝。

这里的队列设置在HystrixCommandAspect创建AbstractCommand时确定

  1. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
  2. HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
  3. HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
  4. HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
  5. this.commandGroup = initGroupKey(group);
  6. this.commandKey = initCommandKey(key, getClass());
  7. this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
  8. this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
  9. this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
  10. this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
  11. this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
  12. ....................
  13. }
  1. private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
  2. if (fromConstructor == null) {
  3. // get the default implementation of HystrixThreadPool
  4. return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
  5. } else {
  6. return fromConstructor;
  7. }
  8. }
  9. static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
  10. // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
  11. String key = threadPoolKey.name();
  12. // this should find it for all but the first time
  13. HystrixThreadPool previouslyCached = threadPools.get(key);
  14. if (previouslyCached != null) {
  15. return previouslyCached;
  16. }
  17. // if we get here this is the first time so we need to initialize
  18. synchronized (HystrixThreadPool.class) {
  19. if (!threadPools.containsKey(key)) {
  20. threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
  21. }
  22. }
  23. return threadPools.get(key);
  24. }

最后会创建HystrixThreadPoolDefault对象。

  1. public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
  2. this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
  3. HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
  4. this.queueSize = properties.maxQueueSize().get();
  5. this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
  6. //此处创建线程池
  7. concurrencyStrategy.getThreadPool(threadPoolKey, properties),
  8. properties);
  9. this.threadPool = this.metrics.getThreadPool();
  10. this.queue = this.threadPool.getQueue();
  11. /* strategy: HystrixMetricsPublisherThreadPool */
  12. HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
  13. }
  14. public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
  15. final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
  16. final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
  17. final int dynamicCoreSize = threadPoolProperties.coreSize().get();
  18. final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
  19. final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
  20. final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
  21. if (allowMaximumSizeToDivergeFromCoreSize) {
  22. final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
  23. if (dynamicCoreSize > dynamicMaximumSize) {
  24. logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
  25. dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
  26. dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
  27. return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
  28. } else {
  29. return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
  30. }
  31. } else {
  32. return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
  33. }
  34. }
  35. public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
  36. if (maxQueueSize <= 0) {
  37. return new SynchronousQueue<Runnable>();
  38. } else {
  39. return new LinkedBlockingQueue<Runnable>(maxQueueSize);
  40. }
  41. }

可以看到如果我们不设置maxQueueSize(默认-1)这个参数,是会创建SynchronousQueue对象。

如果设置了最大请求数,则会创建LinkedBlockingQueue队列。

常用设置:

@HystrixCommand(threadPoolProperties = {@HystrixProperty(name = "coreSize",value = "2"),
        @HystrixProperty(name = "allowMaximumSizeToDivergeFromCoreSize",value="true"),
        @HystrixProperty(name = "maximumSize",value="3"),
        @HystrixProperty(name = "maxQueueSize",value="2")},
        commandProperties = {@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "10000")}
        ,fallbackMethod = "fallback")

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/978714
推荐阅读
相关标签
  

闽ICP备14008679号