当前位置:   article > 正文

Sentinel流控实现原理+代码实现分析_sentinel流控原理

sentinel流控原理

目录

 

原理分析

第一点、拦截点

第二点、流控的具体实现

NodeSelectorSlot

ClusterBuilderSlot

StatisticSlot

FlowSlot

DegradeSlot

SystemSlot

源码分析

创建SlotChain

执行SlotChain的entry方法

执行Slot的entry方法


原理分析

Sentinel流控涉及两个方面:

1、方法的拦截、处理

2、多种流控规则、策略的实现

第一点、拦截点

以java web服务为例进行讲解,在过滤器对请求进行拦截来实现。

  1. <dependency>
  2. <groupId>com.alibaba.csp</groupId>
  3. <artifactId>sentinel-web-servlet</artifactId>
  4. <version>x.y.z</version>
  5. </dependency>

在这个实现包中提供了具体的filter实现类:
com.alibaba.csp.sentinel.adapter.servlet.CommonFilter

  1. @Override
  2. public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
  3. throws IOException, ServletException {
  4. HttpServletRequest sRequest = (HttpServletRequest) request;
  5. Entry urlEntry = null;
  6. try {
  7. String target = FilterUtil.filterTarget(sRequest);
  8. // Clean and unify the URL.
  9. // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
  10. // the amount of context and resources will exceed the threshold.
  11. UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
  12. if (urlCleaner != null) {
  13. target = urlCleaner.clean(target);
  14. }
  15. // If you intend to exclude some URLs, you can convert the URLs to the empty string ""
  16. // in the UrlCleaner implementation.
  17. if (!StringUtil.isEmpty(target)) {
  18. // Parse the request origin using registered origin parser.
  19. String origin = parseOrigin(sRequest);
  20. String contextName = webContextUnify ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME : target;
  21. ContextUtil.enter(contextName, origin);
  22. if (httpMethodSpecify) {
  23. // Add HTTP method prefix if necessary.
  24. String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
  25. urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
  26. } else {
  27. urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
  28. }
  29. }
  30. chain.doFilter(request, response);
  31. } catch (BlockException e) {
  32. HttpServletResponse sResponse = (HttpServletResponse) response;
  33. // Return the block page, or redirect to another URL.
  34. WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
  35. } catch (IOException | ServletException | RuntimeException e2) {
  36. Tracer.traceEntry(e2, urlEntry);
  37. throw e2;
  38. } finally {
  39. if (urlEntry != null) {
  40. urlEntry.exit();
  41. }
  42. ContextUtil.exit();
  43. }
  44. }
  45. private String parseOrigin(HttpServletRequest request) {
  46. RequestOriginParser originParser = WebCallbackManager.getRequestOriginParser();
  47. String origin = EMPTY_ORIGIN;
  48. if (originParser != null) {
  49. origin = originParser.parseOrigin(request);
  50. if (StringUtil.isEmpty(origin)) {
  51. return EMPTY_ORIGIN;
  52. }
  53. }
  54. return origin;
  55. }

从代码来看,先进行:ContextUtil.enter(contextName, origin);
接着通过SphU.entry(String name, int resourceType, EntryType type)方法判断是否放行,如果出现BlockException,则执行阻塞处理器里设置的动作:WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);

第二点、流控的具体实现

在Sentinel里,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个Entry对象。Entry可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用SphU API显式创建。Entry创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
1、NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
2、ClusterBuilderSlot则用于存储资源的统计信息以及调用者信息,例如该资源的RT、QPS、thread count等等,这些信息将用作多维度限流、降级的依据
3、StatisticSlot则用于记录、统计不同维度的runtime指标监控信息
4、FlowSlot则用于根据预设的限流规则以及前面slot统计的状态,来进行流量控制
5、AuthoritySlot则根据配置的黑白名单和调用来源信息,来做黑白名单控制
6、DegradeSlot则通过统计信息以及预设的规则,来做熔断降级
7、SystemSlot则通过系统的状态,例如load1等,来控制总的入口流量

总体框架如下:

在这里插入图片描述

Sentinel将SlotChainBuilder作为SPI接口进行扩展,使得Slot Chain具备了扩展的能力。当然我们可以自行加入自定义的slot并编排slot间的顺序,从而可以给Sentinel添加自定义的功能。

在这里插入图片描述

下面我们来看看每一个slot的功能

NodeSelectorSlot


这个slot主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。

  1. ContextUtil.enter("entrance1", "appA");
  2. Entry nodeA = SphU.entry("nodeA");
  3. if (nodeA != null) {
  4. nodeA.exit();
  5. }
  6. ContextUtil.exit();

上述代码通过Context.enter()创建了一个名为entrance1的上下文,同时指定调用发起者的appA;接着通过SphU.entry()请求一个token,如果该方法顺利执行没有抛BlockException,表明token请求成功。

以上代码将在内存中生成以下结构:
在这里插入图片描述

注意:每个DefaultNode由资源ID和输入名称来标识。换句话说,一个资源ID可以有多个不同入口的DefaultNode。

  1. ContextUtil.enter("entrance1", "appA");
  2. Entry nodeA = SphU.entry("nodeA");
  3. if (nodeA != null) {
  4. nodeA.exit();
  5. }
  6. ContextUtil.exit();
  7. ContextUtil.enter("entrance2", "appA");
  8. nodeA = SphU.entry("nodeA");
  9. if (nodeA != null) {
  10. nodeA.exit();
  11. }
  12. ContextUtil.exit();

以上代码将在内存中生成以下结构:
上面的结构可以通过调用 curl http://localhost:8719/tree?type=root来显示
在这里插入图片描述

ClusterBuilderSlot

此插槽用于构建资源的ClusterNode以及调用来源节点。ClusterNode保持某个资源运行统计信息(响应时间、QPS、block数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由ContextUtil.enter(contextName, origin)中的origin标记。可通过如下命令查看某个资源不同调用者的访问情况:
curl http://localhost:8719/origin?id=caller:
在这里插入图片描述

StatisticSlot

StatisticSlot是Sentinel的核心功能插槽之一,用于统计实时的调用数据。
clusterNode:资源唯一标识的ClusterNode的runtime统计
origin:根据来自不同调用者的统计信息
defaultNode:根据上下文条目名称和资源ID的runtime统计

入口流量的统计
Sentinel底层采用高性能的滑动窗口数据结构LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
在这里插入图片描述

FlowSlot

这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

  • 指定应用生效的规则,即针对调用方限流的
  • 调用方为other的规则
  • 调用方为default的规则

DegradeSlot

这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

SystemSlot

这个slot会根据对于当前系统的整体情况,对入口的资源进行调配。其原理是让入口的流量和当前系统的load达到一个动态平衡。注意这个功能的两个限制:
只对入口流量起作用(调用类型为EntryType.IN),对出口流量无效。可通过SphU.entry()指定调用类型,如果不指定,默认是EntryType.OUT。

Entry entry = SphU.entry("resourceName", EntryType.IN);

只在Unix-like的操作系统上生效。

源码分析

现在以SphU.entry方法为切入点来开始分析。这个方法会去申请一个entry,如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表明已经被限流了。
从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,在CtSph中最终会执行到entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException这个方法。
我们来看一下这个方法的具体实现:

  1. private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
  2. throws BlockException {
  3. Context context = ContextUtil.getContext();
  4. if (context instanceof NullContext) {
  5. // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
  6. // so here init the entry only. No rule checking will be done.
  7. return new CtEntry(resourceWrapper, null, context);
  8. }
  9. if (context == null) {
  10. // Using default context.
  11. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
  12. }
  13. // Global switch is close, no rule checking will do.
  14. if (!Constants.ON) {
  15. return new CtEntry(resourceWrapper, null, context);
  16. }
  17. // 获取资源对应的SlotChain
  18. ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
  19. /*
  20. * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
  21. * so no rule checking will be done.
  22. */
  23. if (chain == null) {
  24. return new CtEntry(resourceWrapper, null, context);
  25. }
  26. Entry e = new CtEntry(resourceWrapper, chain, context);
  27. try {
  28. // 执行Slot的entry方法
  29. chain.entry(context, resourceWrapper, null, count, prioritized, args);
  30. } catch (BlockException e1) {
  31. e.exit(count, args);
  32. //抛出BlockException
  33. throw e1;
  34. } catch (Throwable e1) {
  35. // This should not happen, unless there are errors existing in Sentinel internal.
  36. RecordLog.info("Sentinel unexpected exception", e1);
  37. }
  38. return e;
  39. }

这个方法可以分为以下几个部分:
1、对参数和全局配置项做检测,如果不符合要求就直接返回一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。
2、根据包装过的资源对象获取对应的SlotChain
3、执行SlotChain的entry方法
    3.1、如果SlotChain的entry方法抛出了BlockException,则将该异常抛到上层
    3.2、如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
4、如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行
 

创建SlotChain

首先看一下lookProcessChain的方法实现:

  1. /**
  2. * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will
  3. * be created if the resource doesn't relate one.
  4. *
  5. * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same
  6. * {@link ProcessorSlotChain} globally, no matter in witch {@link Context}.<p/>
  7. *
  8. * <p>
  9. * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
  10. * otherwise null will return.
  11. * </p>
  12. *
  13. * @param resourceWrapper target resource
  14. * @return {@link ProcessorSlotChain} of the resource
  15. */
  16. ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
  17. ProcessorSlotChain chain = chainMap.get(resourceWrapper);
  18. if (chain == null) {
  19. synchronized (LOCK) {
  20. chain = chainMap.get(resourceWrapper);
  21. if (chain == null) {
  22. // Entry size limit.
  23. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
  24. return null;
  25. }
  26. chain = SlotChainProvider.newSlotChain();
  27. Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
  28. chainMap.size() + 1);
  29. newMap.putAll(chainMap);
  30. newMap.put(resourceWrapper, chain);
  31. chainMap = newMap;
  32. }
  33. }
  34. }
  35. return chain;
  36. }

该方法使用了一个HashMap做缓存,key是资源对象。这里加了锁,并且做了double check。具体构造chain的方法是通过spi创建的。

  1. /**
  2. * Builder for a default {@link ProcessorSlotChain}.
  3. *
  4. * @author qinan.qn
  5. * @author leyou
  6. */
  7. public class DefaultSlotChainBuilder implements SlotChainBuilder {
  8. @Override
  9. public ProcessorSlotChain build() {
  10. ProcessorSlotChain chain = new DefaultProcessorSlotChain();
  11. chain.addLast(new NodeSelectorSlot());
  12. chain.addLast(new ClusterBuilderSlot());
  13. chain.addLast(new LogSlot());
  14. chain.addLast(new StatisticSlot());
  15. chain.addLast(new AuthoritySlot());
  16. chain.addLast(new SystemSlot());
  17. chain.addLast(new FlowSlot());
  18. chain.addLast(new DegradeSlot());
  19. return chain;
  20. }
  21. }


Chain是链条的意思,从build的方法可看出,ProcessorSlotChain是一个链表,里面添加了很多个Slot。具体的实现需要到DefaultProcessorSlotChain中去看。

  1. public class DefaultProcessorSlotChain extends ProcessorSlotChain {
  2. AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
  3. @Override
  4. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
  5. throws Throwable {
  6. super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
  7. }
  8. @Override
  9. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
  10. super.fireExit(context, resourceWrapper, count, args);
  11. }
  12. };
  13. AbstractLinkedProcessorSlot<?> end = first;
  14. @Override
  15. public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
  16. protocolProcessor.setNext(first.getNext());
  17. first.setNext(protocolProcessor);
  18. if (end == first) {
  19. end = protocolProcessor;
  20. }
  21. }
  22. @Override
  23. public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
  24. end.setNext(protocolProcessor);
  25. end = protocolProcessor;
  26. }
  27. /**
  28. * Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
  29. *
  30. * @param next processor to be added.
  31. */
  32. @Override
  33. public void setNext(AbstractLinkedProcessorSlot<?> next) {
  34. addLast(next);
  35. }
  36. @Override
  37. public AbstractLinkedProcessorSlot<?> getNext() {
  38. return first.getNext();
  39. }
  40. @Override
  41. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
  42. throws Throwable {
  43. first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
  44. }
  45. @Override
  46. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
  47. first.exit(context, resourceWrapper, count, args);
  48. }
  49. }

DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾结点。

创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾结点,可以用下图表示:

将第一个节点添加到链表中后,整个链表的结构变成了如下图这样:

将所有的节点都加入到链表中后,整个链表的结构变成了如下图所示:

这样就将所有的Slot对象添加到链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。

执行SlotChain的entry方法

lookProcessChain方法获得的ProecssorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是这样的:

  1. @Override
  2. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
  3. throws Throwable {
  4. first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
  5. }

也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。
而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点中重写了entry方法,具体如下:

  1. @Override
  2. public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
  3. throws Throwable {
  4. super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
  5. }

first节点的entry方法,实际又是执行的super的fireEntry方法:

  1. @Override
  2. public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
  3. throws Throwable {
  4. if (next != null) {
  5. next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
  6. }
  7. }

从这里可以看到,从fireEntry方法中就开始执行entry了,这里会执行当前节点的下一个节点transformEntry方法,上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:

从图中可以看出,从最初的调用Chain的entry()方法,转变成调用SlotChain中Slot的entry()方法。从上面的分析可以知道,SlotChain中的第一个Slot节点是NodeSelectorSlot。


执行Slot的entry方法

现在看SlotChain中第一个节点NodeSelectorSlot的entry方法,具体代码如下:

  1. @Override
  2. public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
  3. throws Throwable {
  4. /*
  5. * It's interesting that we use context name rather resource name as the map key.
  6. *
  7. * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
  8. * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
  9. * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
  10. * the resource name must be same but context name may not.
  11. *
  12. * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
  13. * enter same resource in different context, using context name as map key can
  14. * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
  15. * of the same resource name, for every distinct context (different context name) each.
  16. *
  17. * Consider another question. One resource may have multiple {@link DefaultNode},
  18. * so what is the fastest way to get total statistics of the same resource?
  19. * The answer is all {@link DefaultNode}s with same resource name share one
  20. * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
  21. */
  22. DefaultNode node = map.get(context.getName());
  23. if (node == null) {
  24. synchronized (this) {
  25. node = map.get(context.getName());
  26. if (node == null) {
  27. node = new DefaultNode(resourceWrapper, null);
  28. HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
  29. cacheMap.putAll(map);
  30. cacheMap.put(context.getName(), node);
  31. map = cacheMap;
  32. // Build invocation tree
  33. ((DefaultNode) context.getLastNode()).addChild(node);
  34. }
  35. }
  36. }
  37. context.setCurNode(node);
  38. fireEntry(context, resourceWrapper, node, count, prioritized, args);
  39. }

从代码中可以看到,NodeSelectorSlot节点做了一些自己的业务逻辑处理,接着调用fireEntry()方法,由此触发下一个节点的entry方法。此时我们知道sentinel的责任链的传递方式:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。所以可以将上面的图完整了,具体如下:

至此,就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体就是抛出BlockException异常。

类关系图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/889260
推荐阅读
相关标签
  

闽ICP备14008679号