栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

Sentinel

Java 更新时间:发布时间: 百科书网 趣学号
Sentinel是什么?

官网地址:https://github.com/alibaba/Sentinel/wiki/介绍

Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。

基本概念?

1.资源
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,也可以是一段代码。
只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。

2.规则
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。

功能和设计理念? 1.流量控制

流量控制设计理念
其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。流量控制主要有两种统计类型,一种是统计并发线程数,另外一种则是统计 QPS。类型由 FlowRule 的 grade 字段来定义。其中,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。

并发数控制用于保护业务线程池不被慢调用耗尽。例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。Sentinel 并发控制不负责创建和管理线程池,而是简单统计当前请求上下文的线程数目(正在执行的调用数目),如果超出阈值,新的请求会被立即拒绝。

QPS流量控制
当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的效果包括以下几种:直接拒绝、Warm Up、匀速排队。

直接拒绝
直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。

Warm Up
Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
通常冷启动的过程系统允许通过的 QPS 曲线如下图所示:

匀速排队
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。
该方式的作用如下图所示:

这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。

2.熔断降级

熔断降级设计理念
暂时切断不稳定调用,避免局部不稳定因素导致整体的雪崩。
一个服务常常会调用别的模块,可能是另外的一个远程服务、数据库,或者第三方 API 等。例如,支付的时候,可能需要远程调用银联提供的 API;查询某个商品的价格,可能需要进行数据库查询。然而,这个被依赖服务的稳定性是不能保证的。如果依赖的服务出现了不稳定的情况,请求的响应时间变长,那么调用服务的方法的响应时间也会变长,线程会产生堆积,最终可能耗尽业务自身的线程池,服务本身也变得不可用。对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。

熔断策略
Sentinel 提供以下几种熔断策略:
1.慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。

2.异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。

3.异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

源码分享

● sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配
● sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试
● sentinel-cluster 集群模式模块,这个模块提供了Sentinel对集群流量控制的默认实现;
● sentinel-core 核心模块,限流、降级、系统保护等都在这里实现
● sentinel-dashboard 控制台模块,可以对连接上的sentinel客户端实现可视化的管理
● sentinel-demo 样例模块,可参考怎么使用sentinel进行限流、降级等
● sentinel-extension 扩展模块,主要对DataSource进行了部分扩展实现
● sentinel-logging Sentinel的日志模块,使用的slf4j,可以在项目中自定义Sentinel的logger、appender;
● sentinel-transport 传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现

主要分享限流和熔断比较关键的实现:

限流入口

根据Sentinel提供的demo模块可以快速找到限流入口,就是SphU.entry(KEY)代码,此处的key唯一确定限流规则,Sentinel会根据这个key去获取对应的限流规则;客户端请求达到限流规则,抛出BlockException,进入catch模块;在finally内退出entry释放资源。

Entry entry = null;
try {
     entry = SphU.entry(KEY);
     //被保护的资源
     .......
} catch (BlockException blockException) {
     // 接口被限流的时候, 会进入到这里
     System.out.println("---接口被限流了---, exception: ");
     return ResponseEntity.ok("接口限流, 返回空");
} finally {
     // 释放资源
     if (entry != null) {
         entry.exit();
     }
}

entry实现:代 码进来先进行了必要参数的校验,此处的ResourceWrapper就是根据前面的key和EntryType构建的资源包装类

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    // 从 ThreadLocal 中获取 Context 实例
    Context context = ContextUtil.getContext();
    // 如果是 NullContext,那么说明 context name 超过了 2000 个,参见 ContextUtil.trueEnter
    // 这个时候,Sentinel 不再接受处理新的 context 配置,也就是不做这些新的接口的统计、限流熔断等
    if (context instanceof NullContext) {
    	return new CtEntry(resourceWrapper, null, context);
    }

    // 如果我们不显式调用 ContextUtil.enter,这里会进入到默认的 context 中
    if (context == null) {
    	context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }

    // Sentinel 的全局开关,Sentinel 提供了接口让用户可以在 dashboard 开启/关闭
    if (!Constants.ON) {
    	return new CtEntry(resourceWrapper, null, context);
    }

    // 设计模式中的责任链模式。
    // 下面这行代码用于构建一个责任链,入参是 resource,前面我们说过资源的唯一标识是 resource name
    ProcessorSlot chain = lookProcessChain(resourceWrapper);

    // 根据 lookProcessChain 方法,我们知道,当 resource 超过 Constants.MAX_SLOT_CHAIN_SIZE,
    // 也就是 6000 的时候,Sentinel 开始不处理新的请求,这么做主要是为了 Sentinel 的性能考虑
    if (chain == null) {
    	return new CtEntry(resourceWrapper, null, context);
    }

    // 执行这个责任链。如果抛出 BlockException,说明链上的某一环拒绝了该请求,
    // 把这个异常往上层业务层抛,业务层处理 BlockException 应该进入到熔断降级逻辑中
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
    	chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
    	e.exit(count, args);
    	throw e1;
    } catch (Throwable e1) {
    	// This should not happen, unless there are errors existing in Sentinel internal.
    	RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

构建执行链条
根据资源包装类从链map中获取执行链,获取到直接返回;如果没有获取到说明是第一次构建该资源的执行链
ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // 根据 lookProcessChain 方法,我们知道,当 resource 超过 Constants.MAX_SLOT_CHAIN_SIZE,
    			// 也就是 6000 的时候,Sentinel 开始不处理新的请求,这么做主要是为了 Sentinel 的性能考虑
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                chain = SlotChainProvider.newSlotChain();
                Map newMap = new HashMap(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
 

构建该资源对应的资源链,具体构建链代码如下:
加入链map,下次执行这条资源链就不需要再次计算链(如果没有这个map,每次接口调用都需要重新计算链,很浪费资源)

@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    List sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractlinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractlinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }
        chain.addLast((AbstractlinkedProcessorSlot) slot);
    }
    return chain;
}

链条执行

关键类UML图如上,ProcessorSlot定义了链条上插槽方法,包括entry、fireEntry、exit、fireExit方法。链条上的所有插槽都通过抽象类AbstractlinkedProcessorSlot实现了ProcessoreSlot接口。

FlowSlot(限流插槽)实现如下:
限流插槽只实现了entry和exit方法,fireEntry和fireExit方法由抽象父类AbstractlinkedProcessorSlot实现,逻辑比较简单分别是执行链条上的下一个插槽方法和链条上下一个插槽的退出方法

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    fireExit(context, resourceWrapper, count, args);
}

限流插槽的真正的校验交给了FlowRuleChecker。先根据之前的key获取具体的限流规则,然后canPassCheck方法进行校验。代码如下:

public void checkFlow(Function> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    Collection rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

熔断降级

熔断操作同样是通过继承AbstractlinkedProcessorSlot实现ProcessorSlot接口的
真正的熔断校验交给了DegradeRuleManager.checkDegrade
public class DegradeSlot extends

AbstractlinkedProcessorSlot {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

熔断check:
同样是先根据key获取熔断规则
通过passCheck是否需要熔断降级,符合规则就会抛出DegradeException

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
    throws BlockException {
    Set rules = degradeRules.get(resource.getName());
    if (rules == null) {
        return;
    }
    for (DegradeRule rule : rules) {
        if (!rule.passCheck(context, node, count)) {
            throw new DegradeException(rule.getLimitApp(), rule);
        }
    }
}

滑动窗口

sentinel的滑动窗口统计机制就是根据当前时间,获取对应的时间窗口,并更新该时间窗口中的各项统计指标(pass/block/rt等),这些指标被用来进行后续判断,比如限流、降级等;随着时间的推移,当前时间点对应的时间窗口是变化的,这时会涉及到时间窗口的初始化、复用等。可以说,sentinel上的功能所用到的数据几乎都是滑动窗口统计机制来维护和更新的。

既然要做控制,那么首先,Sentinel 就要先做统计,它要知道当前接口的 QPS 和并发是多少,进而判断一个新的请求能不能让它通过。
数据统计的代码在 StatisticNode 中,对于 QPS 数据,它使用了滑动窗口的设计:

//保存最近的{@code INTERVAL}毫秒的统计信息
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
//保存最近60秒的统计数据
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

从上面的代码可以知道,Sentinel 统计了 秒 和 分 两个维度的数据,下面我们简单说说实现类 ArrayMetric 的源码设计。
 public class ArrayMetric implements Metric {
     
     private final LeapArray data;
     
     public ArrayMetric(int sampleCount, int intervalInMs) {
         this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
     }
     
     public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy){ 			if (enableOccupy) { 
         this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);                                                                                                          			} else {
         this.data = new BucketLeapArray(sampleCount, intervalInMs);
     } 
} ...... } 

ArrayMetric 的内部是一个 LeapArray,我们以分钟维度统计的使用来说,它使用子类 BucketLeapArray 实现。

public abstract class LeapArray {
    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    protected final AtomicReferenceArray> array; 
    // 对于分钟维度的设置,sampleCount 为 60,intervalInMs 为 60 * 1000 
    public LeapArray(int sampleCount, int intervalInMs) {
        // 单个窗口长度,这里是 1000ms
    this.windowLengthInMs = intervalInMs / sampleCount;
        // 一轮总时长 60,000 ms 
    this.intervalInMs = intervalInMs; 
        // 60 个窗口 
    this.sampleCount = sampleCount; 
    this.array = new AtomicReferenceArray<>(sampleCount); 
    } 
    // ...... 
} 

它的内部核心是一个数组 array,它的长度为 60,也就是有 60 个窗口,每个窗口长度为 1 秒,刚好一分钟走完一轮。然后下一轮开启“覆盖”操作。
每个窗口是一个 WindowWrap 类实例。
● 添加数据的时候,先判断当前走到哪个窗口了( 当前时间(s) % 60)
,然后需要判断这个窗口是否是过期数据,如果是过期数据(窗口代表的时间距离当前已经超过 1 分钟),需要先重置这个窗口实例的数据。
● 统计数据同理,如统计过去一分钟的 QPS 数据,就是将每个窗口的值相加,当中需要判断窗口数据是否是过期数据,即判断窗口的 WindowWrap 实例是否是一分钟内的数据。核心逻辑都封装在了 currentWindow(long timeMillis) 和 values(long timeMillis)方法中。
添加数据的时候,我们要先获取操作的目标窗口,也就是 currentWindow 这个方法,Sentinel 在这里处理初始化和过期重置的情况:

 public WindowWrap currentWindow(long timeMillis) {
     if (timeMillis < 0) { return null; }
     // 获取窗口下标
     int idx = calculateTimeIdx(timeMillis);
     // 计算该窗口的理论开始时间 
     long windowStart = calculateWindowStart(timeMillis); 
     // 嵌套在一个循环中,因为有并发的情况 
     while (true) {
         WindowWrap old = array.get(idx);
         if (old == null) { 
             // 窗口未实例化的情况,使用一个 CAS 来设置该窗口实例 
             WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); 
             if (array.compareAndSet(idx, null, window)) {
                 return window; 
             } else { 
                 // 存在竞争 
                 Thread.yield(); 
             }
         } else if (windowStart == old.windowStart()) {
             // 当前数组中的窗口没有过期
             return old; 
         } else if (windowStart > old.windowStart()) { 
             // 该窗口已过期,重置窗口的值。使用一个锁来控制并发。 
             if (updateLock.tryLock()) {
                 try {
                     return resetWindowTo(old, windowStart);
                 } finally {
                     updateLock.unlock(); 
                 }
             } else {
                 Thread.yield(); 
             }
         } else if (windowStart < old.windowStart()) {
             // 正常情况都不会走到这个分支,异常情况其实就是时钟回拨,这里返回一个 WindowWrap 是容错 
             return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); 
         } 
     } 
 } 

获取数据,使用的是 values 方法,这个方法返回“有效的”窗口中的数据:

 public List values(long timeMillis) {
     if (timeMillis < 0) {
         return new ArrayList(); 
     }
     int size = array.length();
     List result = new ArrayList(size);
     for (int i = 0; i < size; i++) {
         WindowWrap windowWrap = array.get(i);
         // 过滤掉过期数据 
         if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { 				continue; 
         } 
         result.add(windowWrap.value());
     }
     return result;
 } 
// 判断当前窗口的数据是否是 60 秒内的 
public boolean isWindowDeprecated(long time, WindowWrap windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs; 
} 

这个 values 方法很简单,就是过滤掉那些过期数据就可以了。

转载请注明:文章转载自 www.051e.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号