赞
踩
前端时间遇到的一次大型故障:订单服务应查询量巨大拖垮服务,导致公司核心系统系统瘫痪。那么如何避免此类事情再次发生,公司内部做了大量的服务下线或者尽可能减少服务调用的工作;除此之外,服务提供方要保证在自己有限的负载下正常运作,最直观的方法是限制流量,也即服务限流。
服务限流其实是指当系统资源不够,不足以应对大量请求,即系统资源与访问量出现矛盾的时候,我们为了保证有限的资源能够正常服务,因此对系统按照预设的规则进行流量限制或功能限制的一种方法。
熔断 | 服务降级 | 延迟处理 | 特权处理 |
---|---|---|---|
拒绝流量访问,当系统恢复正常时在关闭熔断 | 将次要服务降级,停止服务,将系统资源释放出来给核心功能 | 在前端设置一个流程缓冲池,将所有的流程全部缓冲到这个池子不立即处理,常见队列缓模式处理,服务端处理不及时会丢失部分请求 | 优先处理需要高保障的请求,其他请求丢去或者延迟处理 |
几项重要的系统指标
TPS(transport per second) | QPS (query per second) | 吞吐量 | RT |
---|---|---|---|
每秒处理的事务数量 | 每秒的响应请求数 | 系统在单位时间内处理请求的数量 | 系统对请求做出响应的时间 |
关系:QPS(TPS)= 并发数/平均响应时间 或者 并发数 = QPS*平均响应时间
Sentinel(哨兵) 是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。
以 Sentinel 在 Dubbo 生态系统中的作用为例,Dubbo 的核心模块包括注册中心、服务提供方、服务消费方和监控四个模块。Sentinel 通过对服务提供方和服务消费方的限流来实现限流。
sentinel-core 核心模块,限流、降级、系统保护等都在这里实现
sentinel-dashboard 控制台模块,可以对连接上的sentinel客户端实现可视化的管理
sentinel-transport 传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
sentinel-extension 扩展模块,主要对DataSource进行了部分扩展实现
sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配
sentinel-demo 样例模块,可参考怎么使用sentinel进行限流、降级等
sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试
在github上拉取sentinle-dashboard最新包
使用如下命令启动
java -Dserver.port=8081 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
其中 -Dserver.port=8081 用于指定 Sentinel 控制台端口为 8081。
访问 localhost:8081后出现如下页面
注:本文讲的控制台无持久化,重启后将丢失所有数据
依赖
<!--sentinel限流--> <!--与控制台通讯包--> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-transport-simple-http</artifactId> <version>1.4.1</version> </dependency> <!--dubbo 接入插件--> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-dubbo-adapter</artifactId> <version>1.4.1</version> </dependency> <!--sentinel注解--> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-annotation-aspectj</artifactId> <version>1.4.1</version> </dependency> <!--热点流控包--> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-parameter-flow-control</artifactId> <version>1.4.1</version> </dependency>
@Service
public class HelloService implements IHelloService {
@SentinelResource(value = "sayHello")
@Override
public String sayHello() {
return "hello dubbo";
}
@Override
public String sayHello(String word) {
return word;
}
}
public class DemoApplication { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.momo.dubbo"); //注册的dubbo服务 IHelloService helloService = context.getBean(HelloService.class); System.out.println(helloService.sayHello()); synchronized (DemoApplication.class) { while (true) { DemoApplication.class.wait(); } } } }
启动时加入 JVM 参数 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口。若启动多个应用,则需要通过 -Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是 8719)
-Dcsp.sentinel.api.port=8719 -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=dubbo-provider-demo
intellij 启动方式
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class SentinelApplication {
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(SentinelApplication.class);
ConfigurableApplicationContext context = springApplication.run(args);
HelloConsumerService helloConsumerService = context.getBean(HelloConsumerService.class);
while (true) {
System.out.println(helloConsumerService.geyHello());
}
}
}
-Dcsp.sentinel.api.port=8719 -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=dubbo-consumer-demo
启动后控制台:
簇点链路中找到自己的资源
新增流控
选择QPS限流并且快速失败,控制台如下
选择排队等待,只有部分请求能被处理
provider使用:
拦截日志统一记录在 ~/logs/csp/sentinel-block.log 中:
sentinel四大限流方案类似,看懂了一个其他的都能懂,这里以dubbo的流空Folw 为模版来讲解。
官网对dubbo框架的适配只有一句话:
Sentinel 提供 Dubbo 的相关适配 Sentinel Dubbo Adapter,主要包括针对 Service Provider 和 Service Consumer 实现的***Filter***。
那么这里的重点就是filter
首先,看下sentinel-duboo-adapter 结构:
就真的只有filter的实现和失败方案(fallback)
那么来看下dubbo filter的定义
以SentinelDubboConsumerFilter 为例:
主要逻辑就是:限流处理(被拦截时抛出BlockException异常,然后调用fallback返回失败方案) -> dubbo资源调用(invoker.invoke)
CommandCenterInitFunc:
这里声明资源
SimpleHttpCommandCenter:
ServerThread:
HttpEventTask:
ModifyRulesCommandHandler :
FlowPropertyListener
来看看CtSph.entryWithPriority 方法里的
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
核心代码:
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
这里主要有三个步骤:
来看看如何创建一个chain:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { //上锁 synchronized (LOCK) { chain = chainMap.get(resourceWrapper); //为空就创建一个chain, if (chain == null) { // 超出最大容量不在创建 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
再来看看SlotChainProvider.newSlotChain()
/** * The load and pick process is not thread-safe, but it's okay since the method should be only invoked * via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock. * * 非线程安全,但是由于是在线程安全方法里调用,所以可以认为线程安全的 * @return new created slot chain */ public static ProcessorSlotChain newSlotChain() { if (builder != null) { return builder.build(); } resolveSlotChainBuilder(); if (builder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); builder = new DefaultSlotChainBuilder(); } return builder.build(); }
builder.build()方法:
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
ProcessorSlotChain是个链表
具体实现看DefaultProcessorSlotChain:
public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }
DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾节点。
创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾节点,可以用下图表示:
将第一个节点添加到链表中后,整个链表的结构变成了如下图这样:
当所有节点都加入链表后,链表结构如下:
来看看DefaultProcessorSlotChain.entry 的方法
first.transformEntry方法:
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
DefaultProcessorSlotChain 的first节点重写了entry方法:
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
};
super.fireEntry:
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
只执行了next.transformEntry方法
而transformEntry方法里面是执行节点的entry方法
FlowSlot节点的entry方法:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//校验流控
checkFlow(resourceWrapper, context, node, count, prioritized);
//执行下个节点的entry方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
此方法就是执行具体的记录或限流逻辑。
dubbo服务调用时首先通过InvokerInvocationHandler找到目标invoker
来看下InvocationHandler定义:
每一个动态代理类都必须要实现InvocationHandler这个接口,并且每个代理类的实例都关联到了一个handler,当我们通过代理对象调用一个方法的时候,这个方法的调用就会被转发为由InvocationHandler这个接口的 invoke 方法来进行调用。
MockClusterInvoker:
为什么是FailoverCluster?
FailoverCluster是缺省值
唉~dubbo的东西下次聊~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。