Sentinel中FlowSlot的原理
FlowSlot插槽用于实现流控检查,根据统计插槽中统计的数据来实现限流。Sentinel默认支持3种流控模式(直接、关联和链路)和3种流控效果(快速失败、Warm Up和排队等待)。本文就来分析一下这些功能是怎么实现的。
FlowSlot
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 检查流控
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 通过FlowRuleChecker来检查流控规则
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
FlowRuleChecker
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 根据资源名称来获取流控规则,里面是把规则按照资源名称分组放好的,所以能很快获取到
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
// 遍历规则
for (FlowRule rule : rules) {
// 如果不能通过,则抛出FlowException异常
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
先是通过资源名称来获取流控规则,随后遍历每条规则并判断是否能通过请求。
获取规则
参数ruleProvider是FlowSlot中传进来的。
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
// 获取所有规则
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
// 获取指定名称的规则
return flowRules.get(resource);
}
};
规则检查
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 集群流控相关
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 本地流控相关
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
这里暂时只分析本地流控。
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 获取节点
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 默认情况下rater都是DefaultController,参考FlowRuleUtil
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
限速器
这里通过限速器来判断,sentinel中支持多种限速器,在处理请求时,根据不同的规则配置选择不同的限速器。如果是限QPS,则会根据不同的控制行为创建不同的限速器,限线程数和默认情况则是DefaultController这种默认的限速器。
DefaultController
DefaultController
StatisticNode
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 根据是限流还是限制并发量,返回当前统计到的数量
int curCount = avgUsedTokens(node);
// 如果已有数量加上需要获取的数量超过了阈值
if (curCount + acquireCount > count) {
// 如果是限QPS
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
// 睡眠
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
// 如果没超过阈值,则直接返回true
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
java
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
@Override
public double passQps() {
// 窗口中的请求通过数量除以窗口时间长度
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public int curThreadNum() {
// 直接返回计数器中的和
return (int)curThreadNum.sum();
}
直接判断限制量是否超过阈值。