Sentinel中FlowSlot的原理

FlowSlot插槽用于实现流控检查,根据统计插槽中统计的数据来实现限流。Sentinel默认支持3种流控模式(直接、关联和链路)和3种流控效果(快速失败、Warm Up和排队等待)。本文就来分析一下这些功能是怎么实现的。


FlowSlot

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 检查流控
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    // 通过FlowRuleChecker来检查流控规则
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

FlowRuleChecker

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    // 根据资源名称来获取流控规则,里面是把规则按照资源名称分组放好的,所以能很快获取到
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        // 遍历规则
        for (FlowRule rule : rules) {
            // 如果不能通过,则抛出FlowException异常
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

先是通过资源名称来获取流控规则,随后遍历每条规则并判断是否能通过请求。

获取规则

参数ruleProviderFlowSlot中传进来的。

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        // 获取所有规则
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        // 获取指定名称的规则
        return flowRules.get(resource);
    }
};

规则检查

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
                                                int acquireCount) {
    return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }

    // 集群流控相关
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }

    // 本地流控相关
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

这里暂时只分析本地流控。

v2.0.0
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    // 获取节点
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }

    // 默认情况下rater都是DefaultController,参考FlowRuleUtil
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

限速器

这里通过限速器来判断,sentinel中支持多种限速器,在处理请求时,根据不同的规则配置选择不同的限速器。如果是限QPS,则会根据不同的控制行为创建不同的限速器,限线程数和默认情况则是DefaultController这种默认的限速器。

DefaultController

v2.0.0
DefaultController
StatisticNode
<
>
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 根据是限流还是限制并发量,返回当前统计到的数量
    int curCount = avgUsedTokens(node);
    // 如果已有数量加上需要获取的数量超过了阈值
    if (curCount + acquireCount > count) {
        // 如果是限QPS
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                // 睡眠
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    // 如果没超过阈值,则直接返回true
    return true;
}
private int avgUsedTokens(Node node) {
    if (node == null) {
        return DEFAULT_AVG_USED_TOKENS;
    }
    return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
@Override
public double passQps() {
    // 窗口中的请求通过数量除以窗口时间长度
    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public int curThreadNum() {
    // 直接返回计数器中的和
    return (int)curThreadNum.sum();
}

直接判断限制量是否超过阈值。