Sentinel中的StatisticSlot的原理
在sentinel中,StatisticSlot用于资源统计,与时间相关的资源则采用滑动窗口来实现,其他资源则采用普通的计数器。本文来分析一下这个插槽的工作原理。
StatisticSlot
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
// 先执行后续的其他插槽
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
// 当前线程数加1
node.increaseThreadNum();
// 给通过的请求加上传入进来的参数count
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
// 统计全局数据,系统自适应限流(SystemSlot)那里会用到
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
}
}
该类顾名思义,用于资源统计。该类会先执行后续其他的插槽,然后再进行资源统计。包括了对线程数量和通过的请求数量进行统计。
DefaultNode
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
@Override
public void addPassRequest(int count) {
// 本地限流计数增加
super.addPassRequest(count);
// 集群限流计数增加
this.clusterNode.addPassRequest(count);
}
StatisticNode
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
@Override
public void addPassRequest(int count) {
// 分别给两个统计维度增加请求通过量
rollingCounterInSecond.addPass(count); // 按秒统计
rollingCounterInMinute.addPass(count); // 按分钟统计
}
在该类中定义了两个滑动窗口,一个是1秒的,另一个是1分钟的。1秒的滑动窗口中有两个采样窗口,即每个窗口0.5秒;1分钟的滑动窗口中有60个采样窗口,即每个采样窗口1秒。滑动窗口的类型是ArrayBucket,但只是封装了一下LeapArray,所以真正的采样窗口是保存在LeapArray中的,每个窗口的类型是WindowWrap,而窗口中又封装了MetricBucket。在MetricBucket中有一个counters数组,存放了各种统计指标的数据,指标类型定义在MetricEvent枚举中。
ArrayMetric
MetricBucket
MetricEvent
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java
@Override
public void addPass(int count) {
// 获取当前时刻的窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 添加数量
wrap.value().addPass(count);
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}
private void initMinRt() {
this.minRt = SentinelConfig.statisticMaxRt();
}
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
// 遍历指标类型
for (MetricEvent event : MetricEvent.values()) {
// 重置指标数据
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public MetricBucket add(MetricEvent event, long n) {
// counters是LongAdder类型的数组,获取对应事件的counter并添加计数
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
public long occupiedPass() {
return get(MetricEvent.OCCUPIED_PASS);
}
public long block() {
return get(MetricEvent.BLOCK);
}
public long exception() {
return get(MetricEvent.EXCEPTION);
}
public long rt() {
return get(MetricEvent.RT);
}
public long minRt() {
return minRt;
}
public long success() {
return get(MetricEvent.SUCCESS);
}
public void addPass(int n) {
// 添加PASS事件的数量
add(MetricEvent.PASS, n);
}
public void addOccupiedPass(int n) {
add(MetricEvent.OCCUPIED_PASS, n);
}
public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
public void addRT(long rt) {
add(MetricEvent.RT, rt);
// Not thread-safe, but it's okay.
if (rt < minRt) {
minRt = rt;
}
}
@Override
public String toString() {
return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass();
}
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java
// 统计指标类型
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
真正的重点是采样窗口的获取。
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根据时间计算下标
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 计算当前滑动窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
// 自旋
while (true) {
// 尝试从窗口中获取旧数据
WindowWrap<T> old = array.get(idx);
if (old == null) { // 窗口还不存在
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
// 新建一个窗口来保存统计数据
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 通过CAS设置统计窗口
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
// 如果失败则主动让出CPU
Thread.yield();
}
} else if (windowStart == old.windowStart()) { // 当前时间所在窗口已存在
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
// 则直接返回
return old;
} else if (windowStart > old.windowStart()) { // 窗口的时间小于当前窗口的时间,说明窗口中的是旧数据,可以被丢弃
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
// 重置窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
/*
* 当前滑动窗口的时间已经大于了目标窗口的时间,说明目标窗口位于当前滑动窗口的左边,即没赶上滑动窗口的滑动速度。
* 这里再次统计到滑动窗口中没有意义,直接创建并返回一个新的窗口。
* 如果上面CAS或者抢锁失败,会主动退出CPU,如果长时间没能被再次调度到,那么可能就会赶不上滑动窗口的情况。
*/
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 当前时间经历过多少个窗口,这里计算的结果更像是窗口编号
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
// 计算索引
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 对齐到窗口的时间长度,也就是说计算结果是窗口长度的整数倍
return timeMillis - timeMillis % windowLengthInMs;
}
这里先根据当前时间计算采样窗口的下标,再计算当前窗口的时间,最后在自旋中获取采样窗口。获取过程要区分多种情况,在上面代码中已有注释说明,不再赘述。