Sentinel中的StatisticSlot的原理

在sentinel中,StatisticSlot用于资源统计,与时间相关的资源则采用滑动窗口来实现,其他资源则采用普通的计数器。本文来分析一下这个插槽的工作原理。


StatisticSlot

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        // 先执行后续的其他插槽
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // Request passed, add thread count and pass count.
        // 当前线程数加1
        node.increaseThreadNum();
        // 给通过的请求加上传入进来的参数count
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            // 统计全局数据,系统自适应限流(SystemSlot)那里会用到
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    }
}

该类顾名思义,用于资源统计。该类会先执行后续其他的插槽,然后再进行资源统计。包括了对线程数量和通过的请求数量进行统计。

DefaultNode

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java
@Override
public void increaseThreadNum() {
    super.increaseThreadNum();
    this.clusterNode.increaseThreadNum();
}
@Override
public void addPassRequest(int count) {
    // 本地限流计数增加
    super.addPassRequest(count);
    // 集群限流计数增加
    this.clusterNode.addPassRequest(count);
}

StatisticNode

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
@Override
public void addPassRequest(int count) {
    // 分别给两个统计维度增加请求通过量
    rollingCounterInSecond.addPass(count); // 按秒统计
    rollingCounterInMinute.addPass(count); // 按分钟统计
}

在该类中定义了两个滑动窗口,一个是1秒的,另一个是1分钟的。1秒的滑动窗口中有两个采样窗口,即每个窗口0.5秒;1分钟的滑动窗口中有60个采样窗口,即每个采样窗口1秒。滑动窗口的类型是ArrayBucket,但只是封装了一下LeapArray,所以真正的采样窗口是保存在LeapArray中的,每个窗口的类型是WindowWrap,而窗口中又封装了MetricBucket。在MetricBucket中有一个counters数组,存放了各种统计指标的数据,指标类型定义在MetricEvent枚举中。

v2.0.0
ArrayMetric
MetricBucket
MetricEvent
<
>
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java
@Override
public void addPass(int count) {
    // 获取当前时刻的窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 添加数量
    wrap.value().addPass(count);
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java
public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        // 遍历指标类型
        for (MetricEvent event : MetricEvent.values()) {
            // 重置指标数据
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        // counters是LongAdder类型的数组,获取对应事件的counter并添加计数
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long occupiedPass() {
        return get(MetricEvent.OCCUPIED_PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public long exception() {
        return get(MetricEvent.EXCEPTION);
    }

    public long rt() {
        return get(MetricEvent.RT);
    }

    public long minRt() {
        return minRt;
    }

    public long success() {
        return get(MetricEvent.SUCCESS);
    }

    public void addPass(int n) {
        // 添加PASS事件的数量
        add(MetricEvent.PASS, n);
    }

    public void addOccupiedPass(int n) {
        add(MetricEvent.OCCUPIED_PASS, n);
    }

    public void addException(int n) {
        add(MetricEvent.EXCEPTION, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    public void addSuccess(int n) {
        add(MetricEvent.SUCCESS, n);
    }

    public void addRT(long rt) {
        add(MetricEvent.RT, rt);

        // Not thread-safe, but it's okay.
        if (rt < minRt) {
            minRt = rt;
        }
    }

    @Override
    public String toString() {
        return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass();
    }
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java
// 统计指标类型
public enum MetricEvent {

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

真正的重点是采样窗口的获取。

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 根据时间计算下标
    int idx = calculateTimeIdx(timeMillis);
    // Calculate current bucket start time.
    // 计算当前滑动窗口的开始时间
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket.
     */
    // 自旋
    while (true) {
        // 尝试从窗口中获取旧数据
        WindowWrap<T> old = array.get(idx);
        if (old == null) { // 窗口还不存在
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            // 新建一个窗口来保存统计数据
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            // 通过CAS设置统计窗口
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                // 如果失败则主动让出CPU
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) { // 当前时间所在窗口已存在
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            // 则直接返回
            return old;
        } else if (windowStart > old.windowStart()) { // 窗口的时间小于当前窗口的时间,说明窗口中的是旧数据,可以被丢弃
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    // 重置窗口
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
            /*
             * 当前滑动窗口的时间已经大于了目标窗口的时间,说明目标窗口位于当前滑动窗口的左边,即没赶上滑动窗口的滑动速度。
             * 这里再次统计到滑动窗口中没有意义,直接创建并返回一个新的窗口。
             * 如果上面CAS或者抢锁失败,会主动退出CPU,如果长时间没能被再次调度到,那么可能就会赶不上滑动窗口的情况。
             */
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    // 当前时间经历过多少个窗口,这里计算的结果更像是窗口编号
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    // 计算索引
    return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    // 对齐到窗口的时间长度,也就是说计算结果是窗口长度的整数倍
    return timeMillis - timeMillis % windowLengthInMs;
}

这里先根据当前时间计算采样窗口的下标,再计算当前窗口的时间,最后在自旋中获取采样窗口。获取过程要区分多种情况,在上面代码中已有注释说明,不再赘述。