赞
踩
目录
Sentinel流控涉及两个方面:
1、方法的拦截、处理
2、多种流控规则、策略的实现
以java web服务为例进行讲解,在过滤器对请求进行拦截来实现。
- <dependency>
- <groupId>com.alibaba.csp</groupId>
- <artifactId>sentinel-web-servlet</artifactId>
- <version>x.y.z</version>
- </dependency>
在这个实现包中提供了具体的filter实现类:
com.alibaba.csp.sentinel.adapter.servlet.CommonFilter
- @Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
- throws IOException, ServletException {
- HttpServletRequest sRequest = (HttpServletRequest) request;
- Entry urlEntry = null;
-
- try {
- String target = FilterUtil.filterTarget(sRequest);
- // Clean and unify the URL.
- // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
- // the amount of context and resources will exceed the threshold.
- UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
- if (urlCleaner != null) {
- target = urlCleaner.clean(target);
- }
-
- // If you intend to exclude some URLs, you can convert the URLs to the empty string ""
- // in the UrlCleaner implementation.
- if (!StringUtil.isEmpty(target)) {
- // Parse the request origin using registered origin parser.
- String origin = parseOrigin(sRequest);
- String contextName = webContextUnify ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME : target;
- ContextUtil.enter(contextName, origin);
-
- if (httpMethodSpecify) {
- // Add HTTP method prefix if necessary.
- String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
- urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
- } else {
- urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
- }
- }
- chain.doFilter(request, response);
- } catch (BlockException e) {
- HttpServletResponse sResponse = (HttpServletResponse) response;
- // Return the block page, or redirect to another URL.
- WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
- } catch (IOException | ServletException | RuntimeException e2) {
- Tracer.traceEntry(e2, urlEntry);
- throw e2;
- } finally {
- if (urlEntry != null) {
- urlEntry.exit();
- }
- ContextUtil.exit();
- }
- }
-
- private String parseOrigin(HttpServletRequest request) {
- RequestOriginParser originParser = WebCallbackManager.getRequestOriginParser();
- String origin = EMPTY_ORIGIN;
- if (originParser != null) {
- origin = originParser.parseOrigin(request);
- if (StringUtil.isEmpty(origin)) {
- return EMPTY_ORIGIN;
- }
- }
- return origin;
- }
从代码来看,先进行:ContextUtil.enter(contextName, origin);
接着通过SphU.entry(String name, int resourceType, EntryType type)方法判断是否放行,如果出现BlockException,则执行阻塞处理器里设置的动作:WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
在Sentinel里,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个Entry对象。Entry可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用SphU API显式创建。Entry创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
1、NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
2、ClusterBuilderSlot则用于存储资源的统计信息以及调用者信息,例如该资源的RT、QPS、thread count等等,这些信息将用作多维度限流、降级的依据
3、StatisticSlot则用于记录、统计不同维度的runtime指标监控信息
4、FlowSlot则用于根据预设的限流规则以及前面slot统计的状态,来进行流量控制
5、AuthoritySlot则根据配置的黑白名单和调用来源信息,来做黑白名单控制
6、DegradeSlot则通过统计信息以及预设的规则,来做熔断降级
7、SystemSlot则通过系统的状态,例如load1等,来控制总的入口流量
总体框架如下:
Sentinel将SlotChainBuilder作为SPI接口进行扩展,使得Slot Chain具备了扩展的能力。当然我们可以自行加入自定义的slot并编排slot间的顺序,从而可以给Sentinel添加自定义的功能。
下面我们来看看每一个slot的功能
这个slot主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。
- ContextUtil.enter("entrance1", "appA");
- Entry nodeA = SphU.entry("nodeA");
- if (nodeA != null) {
- nodeA.exit();
- }
- ContextUtil.exit();
上述代码通过Context.enter()创建了一个名为entrance1的上下文,同时指定调用发起者的appA;接着通过SphU.entry()请求一个token,如果该方法顺利执行没有抛BlockException,表明token请求成功。
以上代码将在内存中生成以下结构:
注意:每个DefaultNode由资源ID和输入名称来标识。换句话说,一个资源ID可以有多个不同入口的DefaultNode。
- ContextUtil.enter("entrance1", "appA");
- Entry nodeA = SphU.entry("nodeA");
- if (nodeA != null) {
- nodeA.exit();
- }
- ContextUtil.exit();
-
- ContextUtil.enter("entrance2", "appA");
- nodeA = SphU.entry("nodeA");
- if (nodeA != null) {
- nodeA.exit();
- }
- ContextUtil.exit();
以上代码将在内存中生成以下结构:
上面的结构可以通过调用 curl http://localhost:8719/tree?type=root来显示
此插槽用于构建资源的ClusterNode以及调用来源节点。ClusterNode保持某个资源运行统计信息(响应时间、QPS、block数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由ContextUtil.enter(contextName, origin)中的origin标记。可通过如下命令查看某个资源不同调用者的访问情况:
curl http://localhost:8719/origin?id=caller:
StatisticSlot是Sentinel的核心功能插槽之一,用于统计实时的调用数据。
clusterNode:资源唯一标识的ClusterNode的runtime统计
origin:根据来自不同调用者的统计信息
defaultNode:根据上下文条目名称和资源ID的runtime统计
入口流量的统计
Sentinel底层采用高性能的滑动窗口数据结构LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:
这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
这个slot会根据对于当前系统的整体情况,对入口的资源进行调配。其原理是让入口的流量和当前系统的load达到一个动态平衡。注意这个功能的两个限制:
只对入口流量起作用(调用类型为EntryType.IN),对出口流量无效。可通过SphU.entry()指定调用类型,如果不指定,默认是EntryType.OUT。
Entry entry = SphU.entry("resourceName", EntryType.IN);
只在Unix-like的操作系统上生效。
现在以SphU.entry方法为切入点来开始分析。这个方法会去申请一个entry,如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表明已经被限流了。
从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,在CtSph中最终会执行到entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException这个方法。
我们来看一下这个方法的具体实现:
- private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
- throws BlockException {
- Context context = ContextUtil.getContext();
- if (context instanceof NullContext) {
- // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
- // so here init the entry only. No rule checking will be done.
- return new CtEntry(resourceWrapper, null, context);
- }
-
- if (context == null) {
- // Using default context.
- context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
- }
-
- // Global switch is close, no rule checking will do.
- if (!Constants.ON) {
- return new CtEntry(resourceWrapper, null, context);
- }
-
- // 获取资源对应的SlotChain
- ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
-
- /*
- * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
- * so no rule checking will be done.
- */
- if (chain == null) {
- return new CtEntry(resourceWrapper, null, context);
- }
-
- Entry e = new CtEntry(resourceWrapper, chain, context);
- try {
- // 执行Slot的entry方法
- chain.entry(context, resourceWrapper, null, count, prioritized, args);
- } catch (BlockException e1) {
- e.exit(count, args);
- //抛出BlockException
- throw e1;
- } catch (Throwable e1) {
- // This should not happen, unless there are errors existing in Sentinel internal.
- RecordLog.info("Sentinel unexpected exception", e1);
- }
- return e;
- }
这个方法可以分为以下几个部分:
1、对参数和全局配置项做检测,如果不符合要求就直接返回一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。
2、根据包装过的资源对象获取对应的SlotChain
3、执行SlotChain的entry方法
3.1、如果SlotChain的entry方法抛出了BlockException,则将该异常抛到上层
3.2、如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
4、如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行
首先看一下lookProcessChain的方法实现:
- /**
- * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will
- * be created if the resource doesn't relate one.
- *
- * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same
- * {@link ProcessorSlotChain} globally, no matter in witch {@link Context}.<p/>
- *
- * <p>
- * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
- * otherwise null will return.
- * </p>
- *
- * @param resourceWrapper target resource
- * @return {@link ProcessorSlotChain} of the resource
- */
- ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
- ProcessorSlotChain chain = chainMap.get(resourceWrapper);
- if (chain == null) {
- synchronized (LOCK) {
- chain = chainMap.get(resourceWrapper);
- if (chain == null) {
- // Entry size limit.
- if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
- return null;
- }
-
- chain = SlotChainProvider.newSlotChain();
- Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
- chainMap.size() + 1);
- newMap.putAll(chainMap);
- newMap.put(resourceWrapper, chain);
- chainMap = newMap;
- }
- }
- }
- return chain;
- }
该方法使用了一个HashMap做缓存,key是资源对象。这里加了锁,并且做了double check。具体构造chain的方法是通过spi创建的。
- /**
- * Builder for a default {@link ProcessorSlotChain}.
- *
- * @author qinan.qn
- * @author leyou
- */
- public class DefaultSlotChainBuilder implements SlotChainBuilder {
-
- @Override
- public ProcessorSlotChain build() {
- ProcessorSlotChain chain = new DefaultProcessorSlotChain();
- chain.addLast(new NodeSelectorSlot());
- chain.addLast(new ClusterBuilderSlot());
- chain.addLast(new LogSlot());
- chain.addLast(new StatisticSlot());
- chain.addLast(new AuthoritySlot());
- chain.addLast(new SystemSlot());
- chain.addLast(new FlowSlot());
- chain.addLast(new DegradeSlot());
-
- return chain;
- }
-
- }
Chain是链条的意思,从build的方法可看出,ProcessorSlotChain是一个链表,里面添加了很多个Slot。具体的实现需要到DefaultProcessorSlotChain中去看。
- public class DefaultProcessorSlotChain extends ProcessorSlotChain {
-
- AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
-
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
- throws Throwable {
- super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
- }
-
- @Override
- public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
- super.fireExit(context, resourceWrapper, count, args);
- }
-
- };
- AbstractLinkedProcessorSlot<?> end = first;
-
- @Override
- public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
- protocolProcessor.setNext(first.getNext());
- first.setNext(protocolProcessor);
- if (end == first) {
- end = protocolProcessor;
- }
- }
-
- @Override
- public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
- end.setNext(protocolProcessor);
- end = protocolProcessor;
- }
-
- /**
- * Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
- *
- * @param next processor to be added.
- */
- @Override
- public void setNext(AbstractLinkedProcessorSlot<?> next) {
- addLast(next);
- }
-
- @Override
- public AbstractLinkedProcessorSlot<?> getNext() {
- return first.getNext();
- }
-
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
- throws Throwable {
- first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
- }
-
- @Override
- public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
- first.exit(context, resourceWrapper, count, args);
- }
-
- }
DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾结点。
创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾结点,可以用下图表示:
将第一个节点添加到链表中后,整个链表的结构变成了如下图这样:
将所有的节点都加入到链表中后,整个链表的结构变成了如下图所示:
这样就将所有的Slot对象添加到链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。
lookProcessChain方法获得的ProecssorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是这样的:
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
- throws Throwable {
- first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
- }
也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。
而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点中重写了entry方法,具体如下:
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
- throws Throwable {
- super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
- }
first节点的entry方法,实际又是执行的super的fireEntry方法:
- @Override
- public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
- throws Throwable {
- if (next != null) {
- next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
- }
- }
从这里可以看到,从fireEntry方法中就开始执行entry了,这里会执行当前节点的下一个节点transformEntry方法,上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:
从图中可以看出,从最初的调用Chain的entry()方法,转变成调用SlotChain中Slot的entry()方法。从上面的分析可以知道,SlotChain中的第一个Slot节点是NodeSelectorSlot。
现在看SlotChain中第一个节点NodeSelectorSlot的entry方法,具体代码如下:
- @Override
- public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
- throws Throwable {
- /*
- * It's interesting that we use context name rather resource name as the map key.
- *
- * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
- * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
- * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
- * the resource name must be same but context name may not.
- *
- * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
- * enter same resource in different context, using context name as map key can
- * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
- * of the same resource name, for every distinct context (different context name) each.
- *
- * Consider another question. One resource may have multiple {@link DefaultNode},
- * so what is the fastest way to get total statistics of the same resource?
- * The answer is all {@link DefaultNode}s with same resource name share one
- * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
- */
- DefaultNode node = map.get(context.getName());
- if (node == null) {
- synchronized (this) {
- node = map.get(context.getName());
- if (node == null) {
- node = new DefaultNode(resourceWrapper, null);
- HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
- cacheMap.putAll(map);
- cacheMap.put(context.getName(), node);
- map = cacheMap;
- // Build invocation tree
- ((DefaultNode) context.getLastNode()).addChild(node);
- }
-
- }
- }
-
- context.setCurNode(node);
- fireEntry(context, resourceWrapper, node, count, prioritized, args);
- }
从代码中可以看到,NodeSelectorSlot节点做了一些自己的业务逻辑处理,接着调用fireEntry()方法,由此触发下一个节点的entry方法。此时我们知道sentinel的责任链的传递方式:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。所以可以将上面的图完整了,具体如下:
至此,就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体就是抛出BlockException异常。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。