netty 4.x
rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 就是这个 addLast 方法
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
- 假设 nio 触发了读事件
- 第一次 NioByteUnsafe#read 里面的 AbstractNioByteChannel#doReadBytes 完毕
- 然后执行 ChannelPipeline#fireChannelRead。
- 调用 AbstractChannelHandlerContext#invokeChannelRead(AbstractChannelHandlerContext, Object),每次执行的时候都会分发一个新的线程去解码 ByteToMessageDecoder#channelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 拿到当前 ChannelHandler 指定的线程池
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
// 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?