在Netty的客户端通信中,一般会使用NioSocketChannel作为通道类型;另外在服务端,NioServerSocketChannel也会使用该类型的通道表示客户端的连接请求。本文分析一下该类的实现原理。
创建过程
在介绍构造方法之前,先来介绍一下NioSocketChannel的继承体系。
NioSocketChannel
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final Method OPEN_SOCKET_CHANNEL_WITH_FAMILY =
SelectorProviderUtil.findOpenMethod("openSocketChannel");
private final SocketChannelConfig config;
/**
* Create a new instance
*/
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioSocketChannel(SelectorProvider provider) {
this(provider, null);
}
/**
* Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
*/
public NioSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
this(newChannel(provider, family));
}
/**
* Create a new instance using the given {@link SocketChannel}.
*/
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
/**
* Create a new instance
*
* @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user
* @param socket the {@link SocketChannel} which will be used
*/
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
如果使用无参构造方法,没有传入SelectorProvider的话,则使用默认的。在Linux平台上,默认创建的是EPollSelectorImpl类的实例。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// java.nio.channels.spi.SelectorProvider 属性指定实现类
if (loadProviderFromProperty())
return provider;
// SPI 指定实现类
if (loadProviderAsService())
return provider;
// 默认实现,Windows 和 Linux 下不同
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux")) // 如果是Linux系统
// EPollSelectorProvider中创建的选择器是EPollSelectorImpl
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
// 加载指定的类
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
// 反射创建指定类的实例
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
而在newChannel方法中则会通过工具类来反射调用EPollSelectorProvider的openSocketChannel方法。
private static SocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
SocketChannel channel = SelectorProviderUtil.newChannel(OPEN_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
return channel == null ? provider.openSocketChannel() : channel;
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <C extends Channel> C newChannel(Method method, SelectorProvider provider,
InternetProtocolFamily family) throws IOException {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
if (family != null && method != null) {
try {
@SuppressWarnings("unchecked")
// 反射调用方法
C channel = (C) method.invoke(
provider, ProtocolFamilyConverter.convert(family));
return channel;
} catch (InvocationTargetException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
return null;
}
实际调用到的方法在EPollSelectorProvider的父类SelectorProviderImpl中。
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
接下来就调用父类的构造方法了。
AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
在该构造方法中,设置了当前网络通道感兴趣的事件是READ。
AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 设置底层JDK的通道对象
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 配置为非阻塞模式
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
在该方法中把网络通道设置为了非阻塞模式,记得在底层JDK的服务端通道的accept方法中默认是设置为阻塞模式的。
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
/*
* 下面分别创建了通道ID,操作底层IO的Unsafe类的实例以及通道流水线
*/
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在该方法中,重要的是创建了Unsafe对象和ChannelPipeline对象。
创建Unsafe对象
该方法在AbstractChannel中是个模板方法,在子类AbstractNioByteChannel中实现了该方法。
@Override
protected AbstractNioUnsafe newUnsafe() {
// 创建unsafe对象
return new NioByteUnsafe();
}
这里创建的是NioByteUnsafe类型的实例。
连接操作
如果使用启动器Bootstrap来启动的话,那么一般会调用其connect方法来建立与服务端的连接。该操作最后调用到Unsafe类的connect方法,在子类AbstractNioUnsafe类中实现了connect方法。
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
// 执行连接
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
if (connectPromise != null && !connectPromise.isDone()
&& connectPromise.tryFailure(new ConnectTimeoutException(
"connection timed out: " + remoteAddress))) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
// 连接成功后会执行该回调
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
在该方法中,会调用doConnect方法来建立连接,该方法在AbstractNioUnsafe中是个模板方法,在子类NioSocketChannel中重写了该方法。
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
// 一般客户端不用绑定地址
doBind0(localAddress);
}
boolean success = false;
try {
// 调用底层JDK通道的connect操作
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) { // 如果还没有连接成功
// 则设置感兴趣的事件为CONNECT
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
在该方法中会通过工具类来调用底层JDK通道的connect操作。
public boolean connect(SocketAddress sa) throws IOException {
int localPort = 0;
synchronized (readLock) {
synchronized (writeLock) {
ensureOpenAndUnconnected();
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(),
isa.getPort());
synchronized (blockingLock()) {
int n = 0;
try {
try {
begin();
synchronized (stateLock) {
if (!isOpen()) {
return false;
}
// notify hook only if unbound
if (localAddress == null) {
NetHooks.beforeTcpConnect(fd,
isa.getAddress(),
isa.getPort());
}
readerThread = NativeThread.current();
}
for (;;) {
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
// 执行底层操作系统的connect调用
n = Net.connect(fd,
ia,
isa.getPort());
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
break;
}
} finally {
readerCleanup();
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
} catch (IOException x) {
// If an exception was thrown, close the channel after
// invoking end() so as to avoid bogus
// AsynchronousCloseExceptions
close();
throw x;
}
synchronized (stateLock) {
remoteAddress = isa;
if (n > 0) {
// Connection succeeded; disallow further
// invocation
state = ST_CONNECTED;
if (isOpen())
localAddress = Net.localAddress(fd);
return true;
}
// If nonblocking and no exception then connection
// pending; disallow another invocation
if (!isBlocking())
state = ST_PENDING;
else
assert false;
}
}
return false;
}
}
}