赞
踩
Hystrix能够提供熔断、限流、断路器保护等等功能,而这些功能均基于数据采集。Netflix Hystrix通过类似滑动窗口的数据结构来统计命令执行过程中的各种指标数据,进而做出对应的响应。
滑动窗口算法(Sliding Window Algorithm)是常见的一种算法:它思想简洁且功能强大,可以用来解决一些查找满足一定条件的连续区间的性质/长度的问题。由于区间连续,因此当区间发生变化时,可以通过旧有的计算结果对搜索空间进行剪枝,这样便减少了重复计算,降低了时间复杂度,它还可以将嵌套的循环问题,转换为单循环问题,同样也是降低时间复杂度。
限量的应用场景非常之广泛,无论是Http请求还是RPC请求,都能看到它的身影,它是稳定性建设的有效措施。限流后的处理方式也可以从多种角度去考虑,比如常见的有两种:
关于限流算法,一般常见的有下面四种:固定窗口、滑动窗口、令牌桶算法(谷歌的开源guava有实现)、漏桶算法
。
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
...
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);// 涉及AbstractCommand初始化
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
...
}
Hystrix中核心概念事件源HystrixEvent
与事件流HystrixEventStream
。
数据流|事件流 HystrixEventStream
用来收集数据。数据内容是数据源HystrixEvent
。
初始化数据发射器即类型为 RxJava 中的 Observable【被观察者|发布者】,会源源不断地产生事件/数据,数据源便是HystrixEvent。
Hystrix它是通过滑动窗口的数据结构/算法
来统计调用的指标数据的,但若直接使用HystrixEventStream作为管道传播数据的话,是点对点的,并无时间区间、时间窗口等概念。BucketedCounterStream
与时间窗口相关的数据传输、收集处理的核心。
BucketedCounterStream其子类初始化过程中涉及到事件流HystrixEventStream
相关类的实例化。
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey,
HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter defaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
...
//metrics:HystrixCommandMetrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker,
this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
...
}
Hystrix通过滑动窗口来对数据进行“平滑”统计,默认情况下,一个滑动窗口包含10个桶(Bucket),每个桶时间宽度是1秒,负责1秒的数据统计。滑动窗口包含的总时间以及其中的桶数量都是可以配置的,来张官方的截图认识下滑动窗口:
上图的每个小矩形代表一个桶,可以看到,每个桶都记录着1秒内的四个指标数据:成功量、失败量、超时量和拒绝量,这里的拒绝量指的就是上面流程图中【信号量/线程池资源检查】中被拒绝的流量。10个桶合起来是一个完整的滑动窗口,所以计算一个滑动窗口的总数据需要将10个桶的数据加起来。
private static HystrixCommandMetrics initMetrics(HystrixCommandMetrics constructor, HystrixCommandGroupKey groupKey,
HystrixThreadPoolKey threadPoolKey, HystrixCommandKey commandKey,HystrixCommandProperties properties) {
...
return HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
}
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
// attempt to retrieve from cache first
HystrixCommandMetrics commandMetrics = metrics.get(key.name());
if (commandMetrics != null) {
return commandMetrics;
} else {
...
HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
metrics.putIfAbsent(key.name(), newCommandMetrics);
return newCommandMetrics;
}
}
public HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
...
healthCountsStream = HealthCountsStream.getInstance(key, properties);
rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {
@Override
public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
switch (eventType) {
case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
default:
initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
break;
}
}
return initialCountArray;
}
};
BucketedCounterStream它是抽象类,提供了基本的**桶计数器(BucketedCounter)**实现:按配置的时间间隔将所有事件聚合成桶。
protected BucketedCounterStream(HystrixEventStream<Event> inputEventStream,int numBuckets,int bucketSizeInMs, Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe()//从事件流获取被观察者或者发布者 .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); } }); }
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> summary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { // reduceBucket:#3.3 healthCheckAccumulator return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(summary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer }
它提供实时的健康检查数据(HystrixCommandMetrics.HealthCounts,统计调用成功和失败的计数,以及比率)。它继承自BucketedRollingCounterStream,三个泛型参数是:
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
// 默认取值500
int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
//默认取值为10000
int rswim = properties.metricsRollingStatisticalWindowInMilliseconds().get();
//默认取值为20
int numHealthCountBuckets = rswim / healthCountBucketSizeInMs;
return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
}
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
HealthCountsStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
HealthCountsStream newStream = new HealthCountsStream(commandKey, numBuckets, bucketSizeInMs,
HystrixCommandMetrics.appendEventToBucket);
streams.putIfAbsent(commandKey.name(), newStream);
...
healthStream.startCachingStreamValuesIfUnstarted();
return newStream;
}
}
private HealthCountsStream(HystrixCommandKey commandKey,int numBuckets,int bucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
// 触发#3.1 #3.2步骤的初始化
super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs,
reduceCommandCompletion, healthCheckAccumulator);
}
此处涉及事件流HystrixEventStream之HystrixCommandCompletionStream流
的初始化。
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>
healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
@Override
public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[]
bucketEventCounts) {
return healthCounts.plus(bucketEventCounts);
}
};
public void startCachingStreamValuesIfUnstarted() {
if (subscription.get() == null) {
//此处开始订阅事件
Subscription candidateSubscription = observe().subscribe(counterSubject);
if (subscription.compareAndSet(null, candidateSubscription)) {
} else {
candidateSubscription.unsubscribe();
}
}
}
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) { HystrixCommandCompletionStream initialStream = streams.get(commandKey.name()); if (initialStream != null) { return initialStream; } else { synchronized (HystrixCommandCompletionStream.class) { HystrixCommandCompletionStream existingStream = streams.get(commandKey.name()); if (existingStream == null) { HystrixCommandCompletionStream newStream = new HystrixCommandCompletionStream(commandKey); streams.putIfAbsent(commandKey.name(), newStream); return newStream; } else { return existingStream; } } } }
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>
(PublishSubject.<HystrixCommandCompletion>create());
this.readOnlyStream = writeOnlySubject.share();
}
事件流初始化rxjava类型中发布者或者被订阅者SerializedSubject
【并发环境下线程安全的】。
public void write(HystrixCommandCompletion event) {
//提供写方法:把该event写到发射器里面去,订阅者订阅
// 该方法的唯一调用处是:HystrixThreadEventStream
writeOnlySubject.onNext(event);
}
@Override
public Observable<HystrixCommandCompletion> observe() {
return readOnlyStream;// #3.1中获取被订阅者SerializedSubject
}
readOnlyStream是只读的、可以被共享消费的流。是 writeOnlySubject 的只读版本,它是通过 share 操作符产生的。
share 操作符产生一种特殊的 Observable:当有一个订阅者去消费事件流时它就开始产生事件,可以有多个订阅者去订阅,同一时刻收到的事件是一致的;直到最后一个订阅者取消订阅以后,事件流才停止产生事件。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。