Netty中的ChannelPipeline

当通道发生操作时,需要用通道处理器来处理这些事件。Netty中将事件的处理和业务进行解耦,让用户只需要编写处理事件的逻辑,将其封装在通道处理器中,然后添加到通道流水线中即可。这样事件发生时,会自动调用到通道处理器中的事件处理逻辑。本文来分析一下通过流水线的实现原理。


ChannelPipeline扮演的角色

首先,ChannelPipeline既然是通道处理器的责任链实现,则很容易想到它是通道处理器的管理器,负责添加、移除责任链中的通道处理器。确实,在ChannelPipeline接口中主要就是定义了这些方法。 另外ChannelPipeline还继承了ChannelInboundInvokerChannelOutboundInvoker两个接口,这两个接口中分别定义了触发入站和出站事件操作的一些方法。ChannelPipeline中实现这两个接口,也就是说明可以通过通道流水线来触发入站和出站的一些操作。而在Netty中确实如此,不管是在用户端,还是在事件循环端,一些事件的操作都会调用到Unsafe类中的方法来,而在Unsafe类中的方法中,都会通过ChannelPipeline来触发事件的传递。

ChannelHandlerContext作为责任链中的节点类型,也继承了ChannelInboundInvokerChannelOutboundInvoker两个接口,从而实现了在责任链中传播事件的目的。这样用户可以只关心实现ChannelHandler接口,而不用实现事件的传播,这是职责分离设计的体现。

管理ChannelHandler

ChannelPipeline接口中定义了多个管理通道处理器的方法。

  • addFirst:在责任链头部添加通道处理器,支持批量添加;针对是否绑定事件处理器组有两种实现;
  • addLast:和addFirst类似,只不过是添加到责任链的尾部;
  • addBefore:添加到指定名称的通道处理器前面;针对是否绑定事件处理器组有两种实现;
  • addAfter:和addAfter类似,只不过是添加到指定名称的通道处理器的后面;
  • remove:移除责任链中的通道处理器,支持移除指定类型或者名称的通道处理器,以及支持移除第一个和最后一个;
  • replace:替换通道处理器,支持按名称和类型来进行替换;
  • get:获取通道处理器,支持按名称和类型来获取;
  • context:获取通道处理器所在的上下文对象,支持按名称和类型来获取;

由于方法实在太多,本文不可能全部介绍,实际上大部分操作的逻辑也有相似之处,所以没必要全部分析。

addLast

v4.1.83
addLast
addLast0
<
>
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    // 线程同步,因为channel可能被暴露给用户线程,所以要做并发控制
    synchronized (this) {
        // 检查handler是否能被多次添加到流水线中
        checkMultiplicity(handler);

        /*
         * 创建ChannelHandlerContext,封装handler。
         * 这里如果指定了name(不是null),那么直接检查名称是否重复了
         * 如果没有指定name,则先生成名称
         *
         * 名称检查是会遍历pipeline中的所有context
         */
        newCtx = newContext(group, filterName(name, handler), handler);

        // 本质上就是一个双向链表的插入操作
        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        /*
         * 如果通道还未注册过
         */
        if (!registered) {
            // 修改其状态
            newCtx.setAddPending();
            // 封装延迟任务并添加到延迟任务链表中
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        /*
         * 如果当前线程是事件循环中的线程,那么提前退出同步块,缓解线程因锁产生的竞争。
         */
        if (!executor.inEventLoop()) {
            // 执行添加handler的回调
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }

    // 回调用户方法和设置状态
    callHandlerAdded0(newCtx);
    return this;
}
public final ChannelPipeline addLast(ChannelHandler handler) {
    return addLast(null, handler);
}

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    ObjectUtil.checkNotNull(handlers, "handlers");

    // 依次添加handler
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

上面一共5个方法,其实最终都会调用到第二个方法。对于批量添加实际实现也是在遍历过程中依次添加每个元素。 上面第二个方法中的操作可以总结为下面的步骤:

  • 检查handler是否能被重复添加;
  • 名称检查或生成;
  • 创建通道处理器上下文对象;
  • 插入双向链表;
  • 执行通道已添加回调,有几种情况:
    • 如果通道还未注册,则封装延迟任务并添加到延迟任务链表中。
    • 否则:
      • 如果不在事件循环中,则封装任务并提交到事件循环中。
      • 否则直接调用回调。

上面除了插入双向链表的操作各个方法有一点差异以外,其他操作都是公共的,在下面统一介绍。

公共操作

检查handler是否能被重复添加

v4.1.83
checkMultiplicity
isSharable
<
>
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        // 如果通道处理器以及被添加过了(可能是添加到其他通道的流水线中)而且不是shareable的,则抛出异常。
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}
java
transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java
public boolean isSharable() {
    Class<?> clazz = getClass();
    // 从缓存中获取判断结果
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    // 缓存中没有结果
    if (sharable == null) {
        // 判断类上是否被@Sharableh注解修饰
        sharable = clazz.isAnnotationPresent(Sharable.class);
        // 将判断结果添加到缓存中
        cache.put(clazz, sharable);
    }
    return sharable;
}

如果通道处理器被多次添加到流水线中(可能是不同通道的流水线),那么判断其是否被@Sharable注解修饰,如果不是则抛出异常。

检查或生成通道处理器名称

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private String filterName(String name, ChannelHandler handler) {
    if (name == null) { // 如果名称为null
        // 则生成名称,内部已包含重复性检查
        return generateName(handler);
    }
    // 检查名称是否重复
    checkDuplicateName(name);
    return name;
}

这里分为了两种情况,如果没有传入名称,则生成默认的名称;如果传入了,则直接进行重复性检查。

生成默认名称

v4.1.83
generateName
context0
<
>
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private String generateName(ChannelHandler handler) {
    // 获取缓存
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    // 获取缓存中的处理器名称
    String name = cache.get(handlerType);
    // 缓存中没有记录
    if (name == null) {
        // 生成名称
        name = generateName0(handlerType);
        // 添加到缓存中
        cache.put(handlerType, name);
    }

    // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
    // any name conflicts.  Note that we don't cache the names generated here.
    if (context0(name) != null) { // 如果链表中存在同名的通道处理器
        // 去掉默认的最后一个字符,即序号标记0
        String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
        // 从1开始,直到找到重复的名称为止
        for (int i = 1;; i ++) {
            // 在名称末尾添加上序号
            String newName = baseName + i;
            if (context0(newName) == null) { // 没有重复的情况
                // 返回名称
                name = newName;
                break;
            }
        }
    }
    return name;
}
private static String generateName0(Class<?> handlerType) {
    // 简单类名加上特殊结尾标记
    return StringUtil.simpleClassName(handlerType) + "#0";
}
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private AbstractChannelHandlerContext context0(String name) {
    // 从头结点开始遍历
    AbstractChannelHandlerContext context = head.next;
    // 遍历通道处理器上下文对象
    while (context != tail) {
        // 如果链表中已经存在相同名称的节点了
        if (context.name().equals(name)) {
            // 返回相同名称的通道上下文对象
            return context;
        }
        // 推动往后遍历
        context = context.next;
    }
    return null;
}

这里直接用通道处理器的类名加上序号作为名称,如果存在重复,则不断试更大的序号,直到找到没有重复的序号。这里自动进行了重复性问题的判断和解决。

重复性校验

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private void checkDuplicateName(String name) {
    // 如果名称重复,则直接抛出异常
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}
private AbstractChannelHandlerContext context0(String name) {
    // 从头结点开始遍历
    AbstractChannelHandlerContext context = head.next;
    // 遍历通道处理器上下文对象
    while (context != tail) {
        // 如果链表中已经存在相同名称的节点了
        if (context.name().equals(name)) {
            // 返回相同名称的通道上下文对象
            return context;
        }
        // 推动往后遍历
        context = context.next;
    }
    return null;
}

如果是用户传进来的名称,如果存在重复,则直接抛出异常。

创建通道上下文对象

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 该方法在选择执行器组里面的执行器
private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    // 判断是否是单个执行器的组
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    // 如果不是单个执行器的组
    if (pinEventExecutor != null && !pinEventExecutor) {
        // 选择执行器
        return group.next();
    }

    // 执行到这里就说明是单个执行器的组了

    // 获取缓存对象
    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) { // 如果缓存还没初始化过
        // Use size of 4 as most people only use one extra EventExecutor.
        // 初始化缓存对象
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    // Pin one of the child executors once and remember it so that the same child executor
    // is used to fire events for the same channel.
    // 从缓存中获取执行器
    EventExecutor childExecutor = childExecutors.get(group);
    if (childExecutor == null) { // 如果缓存中没有记录
        // 选择执行器
        childExecutor = group.next();
        // 将结果添加到缓存中
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}

childExecutor方法中主要在选择事件执行器组中的事件执行器,如果传入的事件执行器组是null,则跳过则一步。最后将各个参数传递给DefaultChannelHandlerContext的构造方法,这是Netty中默认使用的通道处理器上下文类型。

v4.1.83
DefaultChannelHandlerContext
AbstractChannelHandlerContext
<
>
java
transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                              String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    // 判断执行类型,也就是判断通道处理器中是否实现了各种方法。
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

在抽象类AbstractChannelHandlerContext的构造方法中,会设置当前通道处理器能够处理的事件类型掩码,具体怎么设置的在下面事件传播机制小节再介绍。

执行通道处理器添加回调

先来看最简单的一种情况,直接调用。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        // 执行添加handler的回调
        ctx.callHandlerAdded();
    } catch (Throwable t) {
    }
}

会调用到通道处理器上下文中的方法callHandlerAdded

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
final void callHandlerAdded() throws Exception {
    // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
    // any pipeline events ctx.handler() will miss them because the state will not allow it.
    // 修改状态为已添加完毕
    if (setAddComplete()) {
        // 修改成功则执行回调
        handler().handlerAdded(this);
    }
}
final boolean setAddComplete() {
    // 自旋更新状态
    for (;;) {
        int oldState = handlerState;
        // 如果处理器已被移除
        if (oldState == REMOVE_COMPLETE) {
            return false;
        }
        // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
        // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
        // exposing ordering guarantees.
        // CAS设置状态为已添加完毕
        if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return true;
        }
    }
}

在这里直接调用到了通道处理器实现的handlerAdded方法。

在事件循环中调用的情况也比较简单,直接往事件循环中提交一个任务即可,还是会执行到上面介绍的callHandlerAdded0方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
    // 修改其状态为ADD_PENDING,表示正在添加中
    newCtx.setAddPending();
    executor.execute(new Runnable() {
        @Override
        public void run() {
            callHandlerAdded0(newCtx);
        }
    });
}

最后看一种稍微复杂点的情况:延迟执行。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    // 创建任务对象
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    // 获取链表头
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) { // 如果链表没被初始化
        // 则初始化链表
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        // 尾插法:将任务对象添加到链表中
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

这里封装成了任务对象,并将其添加到一个链表中。那这里链表中的任务什么时候被执行呢?在AbstractUnsafe对象的register0方法中在注册了通道后,会调用通道流水线的invokeHandlerAddedIfNeeded方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
/*
 * 当通道注册到选择器中时该方法会被调用
 */
final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    // 只有在第一次注册通道的时候才会调用
    if (firstRegistration) {
        firstRegistration = false;
        // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
        // that were added before the registration was done.
        // 调用延时任务链表中的任务
        callHandlerAddedForAllHandlers();
    }
}
private void callHandlerAddedForAllHandlers() {
    // 临时变量
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        // 修改属性表示通道已经注册到该流水线中了。
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        // 重置链表头,当前是在同步块中,是线程安全的
        this.pendingHandlerCallbackHead = null;
    }

    // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
    // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
    // the EventLoop.
    // 执行任务链表中的任务
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        // 执行任务
        task.execute();
        task = task.next;
    }
}

在上面的方法中,会遍历延迟任务链表,依次执行各个延迟任务。那任务中是怎么执行到通道处理器添加回调的呢?

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    public void run() {
        // 执行通道处理器添加回调
        callHandlerAdded0(ctx);
    }

    @Override
    void execute() {
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
            // 执行通道处理器添加回调
            callHandlerAdded0(ctx);
        } else {
            try {
                // 会在事件循环中执行到上面的run方法
                executor.execute(this);
            } catch (RejectedExecutionException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                            executor, ctx.name(), e);
                }
                atomicRemoveFromHandlerList(ctx);
                ctx.setRemoved();
            }
        }
    }
}

最终还是要调用到callHandlerAdded0方法,所以该方法是执行通道处理器添加回调的核心。

对于其他的通道处理器的管理方法,逻辑上都是相似的,不再一一赘述。

事件传播机制

真正责任链中的事件调度是在AbstractChannelHandlerContext中的一系列的invokeXXX静态方法,在DefaultPipeline的实现中,会调用这些方法,并传入应该从哪个通道处理器上下文开始传播事件。 DefaultChannelPipeline中有两个特殊的ChannelHandlerContext实现:HeadContextTailContext,这两个类既是通道处理器上下文,其本身也是通道处理器,分别是双向链表中的头尾节点。仔细观察就会发现,ChannelInboundInvoker中定义的事件传播方法,都会从HeadContext开始;而ChannelOutboundInvoker中定义的方法,都会从TailContext开始。

由于事件类型众多,下面以通道已注册事件为例来介绍,其他类型的事件的传播是可以类比的。

通道流水线发起事件

通道被注册后,Unsafe中的方法会调用通道流水的fireChannelRegistered方法来发起通道已注册事件。

v4.1.83
java
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
@Override
public final ChannelPipeline fireChannelRegistered() {
    // 从head开始传播注册事件
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

调用AbstractChannelHandlerContext的静态方法invokeChannelRegistered,并传入了headHeadContext类型的属性)。

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
// 调用注册回调
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    // 获取执行器
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 调用通道注册回调注册
        next.invokeChannelRegistered();
    } else {
        // 提交任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                /*
                 * static的invoke方法都是调用的实例invoke方法。
                 */
                next.invokeChannelRegistered();
            }
        });
    }
}

在该方法中会调用通道处理器上下文的实例方法来执行通道处理器。

执行通道处理器

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
// 调用通道处理器的注册回调方法
private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            // DON'T CHANGE
            // Duplex handlers implements both out/in interfaces causing a scalability issue
            // see https://bugs.openjdk.org/browse/JDK-8180450
            // 获取通道处理器
            final ChannelHandler handler = handler();
            // 获取流水线中的头节点
            final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
            /*
             * 这里分三种情况有什么意义,都是调用的相同的方法,参数也相同。
             */
            if (handler == headContext) {
                headContext.channelRegistered(this);
            } else if (handler instanceof ChannelDuplexHandler) {
                ((ChannelDuplexHandler) handler).channelRegistered(this);
            } else {
                ((ChannelInboundHandler) handler).channelRegistered(this);
            }
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRegistered();
    }
}

在该方法中,会执行到通道处理器中相应事件的处理方法。

通道处理器传递事件

在通道处理器的事件处理方法中,其有一个类型为ChannelHandlerContext的参数,ChannelHandlerContext接口也继承了ChannelInboundInvokerChannelOutboundInvoker这两个接口,所以它具备发起事件的能力。 如果事件处理器认为需要往后传递事件,那么可以调用参数对象的事件发起方法。

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext fireChannelRegistered() {
    /*
     * 重写的父类fire方法内部都是:
     * * 查找下一个InBound类型的context;
     * * 调用的对应的static方法。
     * (其他fire方法类似)
     */
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

在该方法中先找到下一个通道处理器,Netty定义了两个方法来找到下一个处理器:findContextInboundfindContextOutbound

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    // 遍历链表
    do {
        // 向前遍历
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}
private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    // 遍历链表
    do {
        // 向前遍历
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

对于入站,会向后遍历;对于出站,会向前遍历。这里的遍历还包括了对节点的筛选,即只筛选合适的节点。

v4.1.83
java
transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
private static boolean skipContext(
        AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
    // 判断是否至少有一个比特的掩码能匹配上
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
            // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
            // everything to preserve ordering.
            //
            // See https://github.com/netty/netty/issues/10067
            (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

属性executionMask是通道处理器能够处理的事件类型掩码,在AbstractChannelHandlerContext的构造方法中会进行设置。

v4.1.83
mask
mask0
<
>
java
transport/src/main/java/io/netty/channel/ChannelHandlerMask.java
static int mask(Class<? extends ChannelHandler> clazz) {
    // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
    // lookup in the future.
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    // 从缓存中获取
    Integer mask = cache.get(clazz);
    // 缓存中
    if (mask == null) {
        // 判断执行类型
        mask = mask0(clazz);
        // 将结果加入缓存中
        cache.put(clazz, mask);
    }
    return mask;
}
java
transport/src/main/java/io/netty/channel/ChannelHandlerMask.java
private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;

            // 如果方法被@Skip注解修饰的
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                // 则取消设置该事件的掩码
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_USER_EVENT_TRIGGERED;
            }
        }

        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;

            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                mask &= ~MASK_READ;
            }
            if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
                mask &= ~MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                mask &= ~MASK_FLUSH;
            }
        }

        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
    } catch (Exception e) {
        // Should never reach here.
        PlatformDependent.throwException(e);
    }

    return mask;
}
ate static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
    if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
        mask |= MASK_ALL_INBOUND;

        // 如果方法被@Skip注解修饰的
        if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
            // 则取消设置该事件的掩码
            mask &= ~MASK_CHANNEL_REGISTERED;
        }
        if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_UNREGISTERED;
        }
        if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_ACTIVE;
        }
        if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_INACTIVE;
        }
        if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
            mask &= ~MASK_CHANNEL_READ;
        }
        if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_READ_COMPLETE;
        }
        if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
            mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
        }
        if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
            mask &= ~MASK_USER_EVENT_TRIGGERED;
        }
    }

    if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
        mask |= MASK_ALL_OUTBOUND;

        if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                SocketAddress.class, ChannelPromise.class)) {
            mask &= ~MASK_BIND;
        }
        if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                SocketAddress.class, ChannelPromise.class)) {
            mask &= ~MASK_CONNECT;
        }
        if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
            mask &= ~MASK_DISCONNECT;
        }
        if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
            mask &= ~MASK_CLOSE;
        }
        if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
            mask &= ~MASK_DEREGISTER;
        }
        if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
            mask &= ~MASK_READ;
        }
        if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                Object.class, ChannelPromise.class)) {
            mask &= ~MASK_WRITE;
        }
        if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
            mask &= ~MASK_FLUSH;
        }
    }

    if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
        mask &= ~MASK_EXCEPTION_CAUGHT;
    }
} catch (Exception e) {
    // Should never reach here.
    PlatformDependent.throwException(e);
}

return mask;

首先判断通道处理器的类型来设置事件掩码,当前可以在方法上加上@Skip注解,这样Netty会取消设置对应事件的掩码。

找到下一个通道处理器后,就会执行该通道处理器,这样不断递归下去,整个通道流水线中的通道处理器都被执行到了。

总结

Netty中的通道流水线会把通道处理器封装为通道处理器上下文,并把这些上下文对象串联成链表。通道流水线对外暴露了事件传播的方法,实际的通道执行和事件传播是在通道处理器上下文中实现的。通道处理器上下文封装好了事件传播的方法,用户只需编写好事件的处理逻辑,并根据实际情况调用这些方法就可以实现事件在通道处理器责任链上的传播了。