
今天呢!猿塔君跟大家讲:
阿里 Sentinel 源码解析
Sentinel 的定位是流量控制、熔断降级,你应该把它理解为一个第三方 Jar 包。
这个 Jar 包会进行流量统计,执行流量控制规则。而统计数据的展示和规则的设置在 sentinel- dashboard 项目中,这是一个 Spring MVC 应用,有后台管理界面,我们通过这个管理后台和各个应 用进行交互。
sentinel-dashboard 并没有定位为一个功能强大的管理后台,一般来说,我们需要基于它来进行二次 开发,甚至于你也可以不使用这个 Java 项目,自己使用其他的语言来实现。在最后一小节,我介绍了 业务应用是怎么和 dashboard 应用交互的。
今晚会有我们在职一线大咖免费做Java进阶技术分享
大家有时间的可以来听听哦!
主题:【分布式架构之Session跨域共享解决方案】
讲师:Roc / 资深架构师
课程收获:
1、session一致性问题的产生背景
2、互联网大厂session共享方案
3、不同session共享方案优缺点对比及应用场景
4、基于nginx负载均衡器模拟session失效

接下来继续干货分享
这里我们先抛开 Sentinel 的各种概念,直接先看下数据统计的代码。数据统计的代码在 StatisticNode 中,对于 QPS 数据,它使用了滑动窗口的设计:
private transient volatile Metric rollingCounterInSecond = new
ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private AtomicInteger curThreadNum = new AtomicInteger(0);
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy)
{
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount,
intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
......
}
这里先介绍较为简单的 BucketLeapArray 的实现,然后在最后一节会介绍
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
protected final AtomicReferenceArray<WindowWrap<T>> array;
// 对于分钟维度的设置,sampleCount 为 60,intervalInMs 为 60 * 1000
public LeapArray(int sampleCount, int intervalInMs) {
// 单个窗口长度,这里是 1000ms
this.windowLengthInMs = intervalInMs / sampleCount;
// 一轮总时长 60,000 ms
this.intervalInMs = intervalInMs;
// 60 个窗口
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
// ......
}

核心逻辑都封装在了 currentWindow(long timeMillis) 和 values(long timeMillis) 方法中。
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 获取窗口下标
int idx = calculateTimeIdx(timeMillis);
// 计算该窗口的理论开始时间
long windowStart = calculateWindowStart(timeMillis);
// 嵌套在一个循环中,因为有并发的情况
}
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 窗口未实例化的情况,使用一个 CAS 来设置该窗口实例
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs,
windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
// 存在竞争
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 当前数组中的窗口没有过期
return old;
} else if (windowStart > old.windowStart()) {
// 该窗口已过期,重置窗口的值。使用一个锁来控制并发。
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 正常情况都不会走到这个分支,异常情况其实就是时钟回拨,这里返回一个
WindowWrap 是容错
return new WindowWrap<T>(windowLengthInMs, windowStart,
newEmptyBucket(timeMillis));
}
}
}
获取数据,使用的是 values 方法,这个方法返回“有效的”窗口中的数据:
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// 过滤掉过期数据
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
// 判断当前窗口的数据是否是 60 秒内的
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
到这里,我们就说完了 分 维度数据统计的问题。至于秒维度的数据统计,有些不一样,稍微复杂一些,我在后面单独起了一节。跳过这部分内容对阅读 Sentinel 源码没有影响。
下面,我们正式开始 Sentinel 的源码介绍。
try (Entry entry = SphU.entry("HelloWorld")) {
// Your business logic here.
System.out.println("hello world");
} catch (BlockException e) {
// Handle rejected request.
e.printStackTrace();
}
2、进入 BlockException 异常分支,代表该次请求被流量控制规则限制了,我们一般会让代码走入到熔断降级的逻辑里面。当然,BlockException 其实有好多个子类,如 DegradeException、FlowException 等,我们也可以 catch 具体的子类来进行处理。7v
Sentinel 提供了很多的 adapter 用于诸如 dubbo、grpc、网关等环境,它们其实都是封装了上述的代码。你只要认真看完本文,那些包装都很容易看懂。

这里我们介绍了 Sentinel 的接口使用,不过它的类名字我现在都没懂是什么意思,SphU、CtSph、CtEntry 这些名字有什么特殊含义,有知道的读者请不吝赐教。
ContextUtil#enter我们先看 Context#enter 方法,这行代码我们是可以不写的,通常情况下,我们都不会显示设置context。
ContextUtil.enter("user-center","app-A");
进入到 ContextUtil 类,大家可能会漏看它的 static 代码块,这里会添加一个默认的EntranceNode 实例。
然后上面的这个方法会走到 ContextUtil#trueEnter 中,这里会添加名为 "user-center" 的
这里的源码非常简单,如果我们从来不显式调用 ContextUtil#enter 方法的话,那 root 就只有一个default 子节点 sentinel_default_context。
context 很好理解,它代表线程执行的上下文,在各种开源框架中都有类似的语义,在 Sentinel 中,我们可以看到,对于一个新的 context name,Sentinel 会往树中添加一个 EntranceNode 实例。它的作用是为了区分调用链路,标识调用入口。在 sentinel-dashboard 中,我们可以很直观地看出调用链路:

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count,
boolean prioritized, Object... args)
throws BlockException {
// 从 ThreadLocal 中获取 Context 实例
Context context = ContextUtil.getContext();
// 如果是 NullContext,那么说明 context name 超过了 2000 个,参见
ContextUtil#trueEnter
// 这个时候,Sentinel 不再接受处理新的 context 配置,也就是不做这些新的接口的统计、限流
熔断等
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
// 我们前面说了,如果我们不显式调用 ContextUtil#enter,这里会进入到默认的 context 中
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT
_
DEFAULT
_
NAME,
""
,
resourceWrapper.getType());
}
// Sentinel 的全局开关,Sentinel 提供了接口让用户可以在 dashboard 开启/关闭
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 设计模式中的责任链模式。
// 下面这行代码用于构建一个责任链,入参是 resource,前面我们说过资源的唯一标识是
resource name
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 根据 lookProcessChain 方法,我们知道,当 resource 超过
Constants.MAX
_SLOT
_CHAIN
_SIZE,
// 也就是 6000 的时候,Sentinel 开始不处理新的请求,这么做主要是为了 Sentinel 的性能考
虑
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 执行这个责任链。如果抛出 BlockException,说明链上的某一环拒绝了该请求,
// 把这个异常往上层业务层抛,业务层处理 BlockException 应该进入到熔断降级逻辑中
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel
internal.
RecordLog.info("Sentinel unexpected exception"
, e1);
}
return e;
}
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
}
这里要强调一点,对于相同的 resource,使用同一个责任链实例,不同的 resource,使用不同的责任链实例。
另外,对于 resource 实例,我们前面也说了,它根据 resource name 来判断,和线程没有关系。

首先,链中第一个处理节点是 NodeSelectorSlot。
// key 是 context name, value 是 DefaultNode 实例
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>
(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj,
int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String,
DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
所以这块代码主要就是要处理:不同的 context name,同一个 resource name 的情况。


应的就很清楚了。
ClusterBuilderSlot


看上图中深色部分,对于每一个 resource,这里会对应一个 ClusterNode 实例,如果不存在,就创建一个实例。
另外,这个类还处理了 origin 不是默认值的情况:
再说一次,origin 代表调用方标识,如 application-a, application-b 等。
if (!""
.equals(context.getOrigin())) {
Node originNode =
node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}


LogSlot

这个类比较简单,我们看到它直接 fire 出去了,也就是说,先处理责任链上后面的那些节点,如果它们抛出了 BlockException,那么这里才做处理。


这个 slot 非常重要,它负责进行数据统计。
大家一定要看一遍这个类的源码,这里没有什么特别的内容需要强调,所以我就不展开说了。
接下来,我们后面要介绍的几个 Slot,需要通过 dashboard 进行开启,因为需要配置规则。
当然,你也可以硬编码规则到代码中。但是要调整数值就比较麻烦,每次都要改代码。

这个类非常简单,做权限控制,根据 origin 做黑白名单的控制:

在 dashboard 中,是这么配置的:



规则校验都在 SystemRuleManager#checkSystem 中:


保护规则是全局的,和具体的某个资源没有关系。
由于系统的平均 RT、当前线程数、QPS 都可以从 ENTRY_NODE 中获得,所以限制代码非常简单,比较一下大小就可以了。如果超过阈值,抛出 SystemBlockException 。
ENTRY_NODE 是 ClusterNode 类型的,而 ClusterNode 对于 rt、qps 都是统计的秒维度的数据。
当然,对于 SystemSlot 类来说,最重要的其实并不是上面的这些,因为在实际使用过程中,对于 RT、线程数、QPS 每一项,我们其实都很难设置一个确定的阈值。
我们往下看它的对于系统负载和 CPU 资源的保护:

我们可以看到,Sentinel 通过调用 MBean 中的方法获取当前的系统负载和 CPU 使用率,Sentinel 起了一个后台线程,每秒查询一次。
OperatingSystemMXBean osBean =
ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
currentLoad = osBean.getSystemLoadAverage();
currentCpuUsage = osBean.getSystemCpuLoad();

FlowSlot

Flow Control 是 Sentinel 的核心, 因为 Sentinel 本身定位就是一个流控工具,所以 FlowSlot 非常重要。
对于读者来说,最大的挑战应该也是这部分代码,因为前面的代码,只要读者理得清楚里面各个类的关系,就不难。而这部分代码由于涉及到限流算法,会稍微复杂一点点。
在 Sentinel 的流控中,我们可以配置流控规则,主要是控制 QPS 和并发线程数,这里我们不讨论控制线程数,控制线程数的代码不在我们这里的讨论范围内,下面的介绍都是指控制 QPS。

public class RateLimiterController implements TrafficShapingController {
// 排队最大时长,默认 500ms
private final int maxQueueingTimeMs;
// QPS 设置的值
private final double count;
// 上一次请求通过的时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
// 通常 acquireCount 为 1,这里不用关心参数 prioritized
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
//
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 计算每 2 个请求之间的间隔,比如 QPS 限制为 10,那么间隔就是 100ms
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
// 可以通过,设置 latestPassedTime 然后就返回 true 了
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 不可以通过,需要等待
long waitTime = costTime + latestPassedTime.get() -
TimeUtil.currentTimeMillis();
// 等待时长大于最大值,返回 false
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 将 latestPassedTime 往前推
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 需要 sleep 的时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
WarmUpController 用来防止突发流量迅速上升,导致系统负载严重过高,本来系统在稳定状态下能处理的,但是由于许多资源没有预热,导致这个时候处理不了了。比如,数据库需要建立连接、需要连接到远程服务等,这就是为什么我们需要预热。

使用 “【】” 代表根据这个场景计算出来的值。
public class WarmUpController implements TrafficShapingController {
// 阈值
protected double count;
// 3
private int coldFactor;
// 转折点的令牌数,和 Guava 的 thresholdPermits 一个意思
// [500]
protected int warningToken = 0;
// 最大的令牌数,和 Guava 的 maxPermits 一个意思
// [1000]
private int maxToken;
// 斜线斜率
// [1/25000]
protected double slope;
// 累积的令牌数,和 Guava 的 storedPermits 一个意思
protected AtomicLong storedTokens = new AtomicLong(0);
// 最后更新令牌的时间
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor)
{
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
// 下面的构造方法,和 Guava 中是差不多的,只不过 thresholdPermits 和 maxPermits 都
换了个名字
private void construct(double count, int warmUpPeriodInSec, int coldFactor)
{
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger
than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// warningToken 和 thresholdPermits 是一样的意思,计算结果其实是一样的
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// 【warningToken = (10*100)/(3-1) = 500】
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// maxToken 和 maxPermits 是一样的意思,计算结果其实是一样的
// maxPermits = thresholdPermits +
2*warmupPeriod/(stableInterval+coldInterval)
// 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 +
coldFactor));
// 斜率计算
// slope
// slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermitsthresholdPermits);
// 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Sentinel 的 QPS 统计使用的是滑动窗口
// 当前时间窗口的 QPS
long passQps = (long) node.passQps();
// 这里是上一个时间窗口的 QPS,这里的一个窗口跨度是1秒钟
long previousQps = (long) node.previousPassQps();
// 同步。设置 storedTokens 和 lastFilledTime 到正确的值
syncToken(previousQps);
long restToken = storedTokens.get();
// 令牌数超过 warningToken,进入梯形区域
if (restToken >= warningToken) {
// 这里简单说一句,因为当前的令牌数超过了 warningToken 这个阈值,系统处于需要
预热的阶段
// 通过计算当前获取一个令牌所需时间,计算其倒数即是当前系统的最大 QPS 容量
long aboveToken = restToken - warningToken;
// 这里计算警戒 QPS 值,就是当前状态下能达到的最高 QPS。
// (aboveToken * slope + 1.0 / count) 其实就是在当前状态下获取一个令牌所需
要的时间
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 /
count));
// 如果不会超过,那么通过,否则不通过
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// count 是最高能达到的 QPS
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
// 下面几行代码,说明在第一次进入新的 1 秒钟的时候,做同步
// 题外话:Sentinel 默认地,1 秒钟分为 2 个时间窗口,分别 500ms
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
/ 令牌数量的旧值
long oldValue = storedTokens.get();
// 计算新的令牌数量,往下看
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 令牌数量上,减去上一分钟的 QPS,然后设置新值
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
// 更新令牌数
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 当前令牌数小于 warningToken,添加令牌
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) *
count / 1000);
} else if (oldValue > warningToken) {
// 当前令牌数量处于梯形阶段,
// 如果当前通过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却
速度
// 那么不需要添加令牌,否则需要添加令牌
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime -
lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
coolDownTokens 这个方法用来计算新的 token 数量,其实我也没有完全理解作者的设计:
第一、对于令牌的增加,在 Guava 中,使用 warmupPeriodMicros / maxPermits 作为增长率,因为它实现的是 storedPermits 从 0 到 maxPermits 花费的时间为 warmupPeriod。而这里是以设置的 QPS 作为增长率,为什么?
第二、else if 分支中的决定我没有理解,为什么用 passQps 和 count / coldFactor 进行对比来决定是否继续添加令牌?
我自己的理解是,count/coldFactor 就是指冷却速度,那么就是说得通的。欢迎大家一起探讨。
Guava 在于控制获取令牌的速率,它关心的是,获取 permits 需要多少时间,包括从 storedPermits中获取,以及获取 freshPermits,以此推进 nextFreeTicketMicros 到未来的某个时间点。
而 Sentinel 在于控制 QPS,它用令牌数来标识当前系统处于什么状态,根据时间推进一直增加令牌,根据通过的 QPS 一直减少令牌。如果 QPS 持续下降,根据推演,可以发现 storedTokens 越来越多,然后越过 warningTokens 这个阈值,之后只有当 QPS 下降到 count/3 以后,令牌才会继续往上增长,一直到 maxTokens。
storedTokens 是以 “count 每秒”的增长率增长的,减少是以 前一分钟的 QPS 来减少的。其实这里我也有个疑问,为什么增加令牌的时候考虑了时间,而减少的时候却不考虑时间因素,提了issue,不过还没有得到回答。
注意,这个类继承自刚刚介绍的 WarmUpController。它的代码其实就是前面介绍的
public class WarmUpRateLimiterController extends WarmUpController {
private final int timeoutInMs;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int
timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeoutInMs = timeOutMs;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
// 和 RateLimiterController 比较,区别主要就是这块代码,计算 costTime 上有区别
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 /
count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
这个代码很简单,就是 RateLimiterController 中的代码,然后加入了预热的内容。
但是这边,加入了 WarmUp 的内容,就是说,通过令牌数量,来判断当前系统的 QPS 应该是多少,如果当前令牌数超过 warningTokens,那么系统的最大 QPS 容量已经低于我们预设的 QPS,相应的,costTime 就会延长。



if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
这里花点篇幅介绍一下客户端是怎么和 dashboard 进行交互的。

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.6.3</version>
</dependency>
-Dcsp.sentinel.dashboard.server=127.0.0.1:8080 -Dproject.name=sentinel-learning
当我们在第一次使用 Sentinel 以后,Sentinel 会自动注册。
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
这里使用了 Env 类,其实就是这个类做的事情:
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
进到 InitExecutor.doInit 方法:
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
}
// ...
}
CommandCenterInitFunc类和HeartbeatSenderInitFunc类。
@Override
public void init() {
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender
loaded");
return;
}
initSchedulerIfNeeded();
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
// 启动一个定时器,发送心跳信息
scheduleHeartbeatTask(sender, interval);
}
这里看到,init 方法的第一行就是去加载 HeartbeatSender 的实现类,这里又用到了 SPI 的机制,如果我们添加了 sentinel-transport-simple-http 这个依赖,那么 SimpleHttpHeartbeatSender 就会被加载。
Sentinel 在客户端并没有使用第三方 http 包,而是自己基于 JDK 的 Socket 和 ServerSocket 接口实现了简单的客户端和服务端,主要也是为了不增加依赖。
我们前面介绍了滑动窗口用在 分 维度的数据统计上,当我们在说 QPS 的时候,当然我们一般指的是秒维度的数据。当然,你在很多地方看到的 QPS 数据,其实都是通过分维度的数据来得到的,包括metrics 日志文件、dashboard 中的 QPS。
设想一个场景,我们的一个资源,访问的 QPS 稳定是 10,假设请求是均匀分布的,在相对时间 0.0 -1.0 秒区间,通过了 10 个请求,我们在 1.1 秒的时候,观察到的 QPS 可能只有 5,因为此时第一个时间窗口被重置了,只有第二个时间窗口有值。
这个大家应该很容易理解,如果你觉得不理解,可以不用浪费时间在这节了
所以,我们可以知道,如果用 BucketLeapArray 来实现,会有 0~50% 的数据误差,这肯定是不能接受的。
大家翻开 StatisticNode 的源码,对于秒维度数据统计,Sentinel 使用下面的构造方法:
// 2 个时间窗口,每个窗口长度 0.5 秒
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
OccupiableBucketLeapArray 实现类的源码并不长,我们大概看一眼,可以发现它的
所以,我们要大胆猜测一下,这个类里面的 borrowArray 做了一些事情,它是
@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket>
windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
我们发现,如果按照它的这种定义,在调用 values() 方法的时候,所有的 2 个窗口都是过期的,将得不到任何的值。所以,我们大概可以判断,给这个数组添加值的时候,使用的时间应该不是当前时间,而是一个未来的时间点。这大概就是 Future 要表达的意思。
我们再回到 OccupiableBucketLeapArray 这个类,可以看到在重置的时候,它使用了 borrowArray 的值:
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w,
long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
有了这个思路,我们再看 borrowArray 中的值是怎么进来的。
@Override
public void addWaiting(long time, int acquireCount) {
WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
window.value().add(MetricEvent.PASS, acquireCount);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
// 只有设置了 prioritized 的情况才会进入到下面的 if 分支
// 也就是说,对于一般的场景,被限流了,就快速失败
if (prioritized && grade == RuleConstant.FLOW
_GRADE
_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 下面的这行 tryOccupyNext 非常复杂,大意就是说去占有"未来的"令牌
// 可以看到,下面做了 sleep,为了保证 QPS 不会因为预占而撑大
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
// 就是这里设置了 borrowArray 的值
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass
after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
看到这里,我其实还有很多疑问没有被解开 !!!
Occupiable 这里代表可以被预占的意思,结合上面 DefaultController 的源码,可以知道它原来是用来满足 prioritized 类型的资源的,我们可以认为这类请求有较高的优先级。如果 QPS 达到阈值,这类资源通常不能用快速失败返回, 而是让它去预占未来的 QPS 容量。
public static void main(String[] args) {
// 下面几行代码设置了 QPS 阈值是 100
FlowRule rule = new FlowRule("test");
rule.setGrade(RuleConstant.FLOW
_GRADE
_QPS);
rule.setCount(100);
rule.setControlBehavior(RuleConstant.CONTROL
_
BEHAVIOR
_
DEFAULT);
List<FlowRule> list = new ArrayList<>();
list.add(rule);
FlowRuleManager.loadRules(list);
// 先通过一个请求,让 clusterNode 先建立起来
try (Entry entry = SphU.entry("test")) {
} catch (BlockException e) {
}
// 起一个线程一直打印 qps 数据
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println(ClusterBuilderSlot.getClusterNode("test").passQps());
}
}
}).start();
while (true) {
try (Entry entry = SphU.entry("test")) {
Thread.sleep(5);
} catch (BlockException e) {
// ignore
} catch (InterruptedException e) {
// ignore
}
}
}
跑一下代码,然后观察下输出,QPS 数据在 50~100 这个区间一直变化,印证了我前面说的,秒级QPS 统计是极度不准确的。
本文比较简单,大家应该很快就可以看完了


欢迎大家反馈阅读感受,有什么需要改进的欢迎提出。
今晚会有我们在职一线大咖免费做Java进阶技术分享
大家有时间的可以来听听哦!
主题:【分布式架构之Session跨域共享解决方案】
讲师:Roc / 资深架构师
课程收获:
1、session一致性问题的产生背景
2、互联网大厂session共享方案
3、不同session共享方案优缺点对比及应用场景
4、基于nginx负载均衡器模拟session失效

END

扫码关注

