-- [I]AttributeMap, ChannelOutboundInvoker, Comparable -- [I]AttributeMap---- [I]Channel ---- [C]DefaultAttributeMap-- [I]Channel, [C]DefaultAttributeMap---- [AC]AbstractChannel------ [AC]AbstractNioChannel-------- [AC]AbstractNioMessageChannel, [AC]AbstractNioByteChannel-- [I]Channel -- [I]Channel---- [I]ServerChannel ---- [I]DuplexChannel------ [I](N)ServerSocketChannel ------ [I](N)SocketChannel-- [I](N)ServerSocketChannel, [AC]AbstractNioMessageChannel -- (N)SocketChannel, AbstractNioByteChannel---- [C]NioServerSocketChannel ---- [C]NioSocketChannel
这个实现还是蛮严格的:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
注册这个事情看起来不复杂,但是要严格写好写对就要注意。
注册本来只需要用:
/*** Registers this channel with the given selector, returning a selection* key.** <p> If this channel is currently registered with the given selector then* the selection key representing that registration is returned. The key's* interest set will have been changed to <tt>ops</tt>, as if by invoking* the {@link SelectionKey#interestOps(int) interestOps(int)} method. If* the <tt>att</tt> argument is not <tt>null</tt> then the key's attachment* will have been set to that value. A {@link CancelledKeyException} will* be thrown if the key has already been cancelled.** <p> Otherwise this channel has not yet been registered with the given* selector, so it is registered and the resulting new key is returned.* The key's initial interest set will be <tt>ops</tt> and its attachment* will be <tt>att</tt>.** <p> This method may be invoked at any time. If this method is invoked* while another invocation of this method or of the {@link* #configureBlocking(boolean) configureBlocking} method is in progress* then it will first block until the other operation is complete. This* method will then synchronize on the selector's key set and therefore may* block if invoked concurrently with another registration or selection* operation involving the same selector. </p>** <p> If this channel is closed while this operation is in progress then* the key returned by this method will have been cancelled and will* therefore be invalid. </p>** @param sel* The selector with which this channel is to be registered** @param ops* The interest set for the resulting key** @param att* The attachment for the resulting key; may be <tt>null</tt>** @throws ClosedChannelException* If this channel is closed** @throws ClosedSelectorException* If the selector is closed** @throws IllegalBlockingModeException* If this channel is in blocking mode** @throws IllegalSelectorException* If this channel was not created by the same provider* as the given selector** @throws CancelledKeyException* If this channel is currently registered with the given selector* but the corresponding key has already been cancelled** @throws IllegalArgumentException* If a bit in the <tt>ops</tt> set does not correspond to an* operation that is supported by this channel, that is, if* {@code set & ~validOps() != 0}** @return A key representing the registration of this channel with* the given selector*/java.nio.channels.SelectableChannel.register(Selector sel, int ops, Object att)throws ClosedChannelException;
Selector实例,用的是EventLoop的unwrappedSelector实例
int ops 此处目前注册时送的是0(可以送定义的4个之外的?),SelectionKey定义了4个:
- OP_READ 1
- OP_WRITE 4
- OP_CONNECT 8
- OP_ACCEPT 16
Object att 表示要塞给Selector实例的附件,此处送的是AbstractNioChannel
doRegister异常的处理:
+ | +----------v----------++-----------------------> SelectableChannel || | .register || +----------+----------+| || || +----------v----------+ +-----------------+| | is success +---Y---> return || +---------------------+ +-----------------+| N N| | || +------------------v-----+ +---v----------------+| | CancelledKeyException | | other exception || +-----------+------------+ +------+-------------+| | || | | +-----------------+| +--------v--------+ +-------> throw exp || | selected? | +-------^---------+| +-------------+---+ || N | || | +--------Y--------------------------+| +---------v---+| | selectNow |+----+ selected. | +-------------+
有几个注意点
JDK SelectionKey定义的4个操作没有0,但是注册时ops时为啥送0 ?可以参见io.netty.channel.socket.nio.AbstractNioChannel doRegister() ?? #1836,有netty作者,和写这个代码的commiter的解释。大致意思就是为了解决潜在的JDK的bug。
It‘s an intentional stuff(有意设计) to deal with a potential(潜在的) JDK bug where it returns a selection key with readyOps set to 0 for no good reason, leading to a spin loop. So, I‘d leave it as it is.
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
有个dengyuankai272同学说到如下,感觉解释的更靠谱,作者解释的那个0不是注册的这个地方
There are two way to register interested events:Channel.register()SelectionKey.interestOps(int)netty use 2.After binding channel to selector with interestOps 0, netty will invoke fireChannelActive()->fire a read event->HeadContext.unsafe.doBeginRead()->SelectionKey.interestOps(int)to set interestOps.
用了eventLoop().cancel(selectionKey());
跟进去逻辑如下:
+-------------------+ | SelectionKey | | .cancel | +--------+----------+ | | +--------v----------+ | cancelledKey add 1| +--------+----------+ | | | +----------v-----------+ | cancelledKeys | +----+ >= CLEANUP_INTERVAL+----+ | | (256,hard-code) | | N +----------------------+ Y | | | |+-----v--------+ +-------------v-------+| return | | needsToSelectAgain|| | | = true |+--------------+ +-----------+---------+ | | // poll task,processSelectedKeysPlain,closeAll等地方触发 +-----------v---------+ | selector | | .selectNow | +---------------------+
+------------------+ | SelectionKey +----N------+ | .isValid | | +------------------+ +------v------+ Y | return | | +-------------+ +--------v---------+ | readPending | | = true | +--------+---------+ | | | |+-----------------v----------------+| selectionKey.interestOps || (interestOps | readInterestOp) |+--------------+-------------------+
connectPromise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectTimeoutFuture.cancel(false);
将连接的promise和连接超时的future设置成失败和取消。
是将非direct的bug包装转换成direct的buf,比如PooledHeapByteBuf
就不是direct的。
分配direct buf,一是靠ByteBufAllocator.directBuffer,另一是靠ByteBufUtil.threadLocalDirectBuffer()。
这个还是有点意思的,还会涉及到 ReferenceCountUtil
,待分析。
newDirectBuffer
会用在AbstractNioByteChannel.filterOutboundMessage(Object)
逻辑中。
在RestExpress项目中,这个方法的实际调用栈示例与简单分析如下:
ServerBootstrapFactory$2.newThread(Runnable) line: 90 // 开始创建worker线程 ThreadPerTaskExecutor.execute(Runnable) line: 33 NioEventLoop(SingleThreadEventExecutor).doStartThread() line: 894 NioEventLoop(SingleThreadEventExecutor).startThread() line: 865 NioEventLoop(SingleThreadEventExecutor).execute(Runnable) line: 758 NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise) line: 483 NioEventLoop(SingleThreadEventLoop).register(ChannelPromise) line: 80 NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74 // return register(new DefaultChannelPromise(channel, this)); 拿Channel new了一个DefaultChannelPromise,这个Channel对象就是上面boss线程read出来message Object,也就是NioSocketChannel实例。 NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) line: 86 ServerBootstrap$ServerBootstrapAcceptor.channelRead(ChannelHandlerContext, Object) line: 255 // 这个是boss线程accept之后准备投递到worker线程的地方 childGroup.register(child).addListener(new ChannelFutureListener() {... DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline.fireChannelRead(Object) line: 930 AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93 // ** 正在分析的这个方法 NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 // 是accept或者read事件 或者是0 触发boss线程上的read动作。注意是 boss线程。在boss线程上accept等同于read。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }NioEventLoop.processSelectedKeysOptimized() line: 612 NioEventLoop.processSelectedKeys() line: 529 NioEventLoop.run() line: 491 SingleThreadEventExecutor$5.run() line: 905 Thread.run() line: 748
这个堆栈实际上也解释了,boss线程从接到IO上的accept事件到交给work线程去做后续的读取的整个过程。
更详细的用sql查trace数据:
select * from trace_data where id>=15309 and THREAD_ID=13 and STACK_NUM>5 order by id
逻辑流程:
靠子类的doReadMessages将需要读取的message读取出来放到readBuf (一个List)
迭代读取到的message挨个通知pipeline channelRead事件,pipeline.fireChannelRead
清理readBuf
通知pipeline channelReadComplete事件,pipeline.fireChannelReadComplete
如果异常则关闭 closeOnReadError 并通知pipeline fireExceptionCaught事件
子类实现
此处我们用到的AbstractNioMessageChannel
有个子类实现是NioServerSocketChannel
doReadMessages实先的内容是: 做accept操作 并创建NioSocketChannel实例作为read到的message,这个Channel实例看上面的的调用栈,会在NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74用到。
看下trace数据:
select * from trace_data where class_name like '%AbstractNioByteChannel%'
数据:
ID THREAD_ID STACK_NUM THREAD_NAME METHOD_ID CLASS_NAME METHOD_NAME LINE_NUM 15336 13 7 boss-0 14143 io/netty/channel/nio/AbstractNioByteChannel <clinit> 4515350 13 8 boss-0 14123 io/netty/channel/nio/AbstractNioByteChannel <init> 6715374 13 15 boss-0 14144 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe <init> 9715407 13 9 boss-0 14186 io/netty/channel/nio/AbstractNioByteChannel$1 <init> 4915420 13 13 boss-0 14127 io/netty/channel/nio/AbstractNioByteChannel metadata 8520790 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 18620793 15 6 worker-0 14128 io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady 8926019 15 30 worker-0 14133 io/netty/channel/nio/AbstractNioByteChannel filterOutboundMessage 28326288 15 30 worker-0 14139 io/netty/channel/nio/AbstractNioByteChannel clearOpWrite 34826357 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 18626360 15 6 worker-0 14128 io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady 8926757 15 6 worker-0 14145 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe closeOnRead 11126767 15 7 worker-0 14141 io/netty/channel/nio/AbstractNioByteChannel access$000 4326768 15 8 worker-0 14129 io/netty/channel/nio/AbstractNioByteChannel isAllowHalfClosure 93
创建是在boss线程,工作是在work线程,主要用到的方法是read。
tarce 数据中的记录:
ID THREAD_ID STACK_NUM THREAD_NAME METHOD_ID CLASS_NAME METHOD_NAME LINE_NUM 20790 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186
从NioByteUnsafe.read到http object解码的调用栈:
HttpRequestDecoder(HttpObjectDecoder).decode(ChannelHandlerContext, ByteBuf, List<Object>) line: 196 HttpRequestDecoder(ByteToMessageDecoder).decodeRemovalReentryProtection(ChannelHandlerContext, ByteBuf, List<Object>) line: 502 HttpRequestDecoder(ByteToMessageDecoder).callDecode(ChannelHandlerContext, ByteBuf, List<Object>) line: 441 HttpRequestDecoder(ByteToMessageDecoder).channelRead(ChannelHandlerContext, Object) line: 278 DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 ReadTimeoutHandler(IdleStateHandler).channelRead(ChannelHandlerContext, Object) line: 286 DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline.fireChannelRead(Object) line: 930 NioSocketChannel$NioSocketChannelUnsafe(AbstractNioByteChannel$NioByteUnsafe).read() line: 163 NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 NioEventLoop.processSelectedKeysOptimized() line: 612 NioEventLoop.processSelectedKeys() line: 529 NioEventLoop.run() line: 491 SingleThreadEventExecutor$5.run() line: 905 Thread.run() line: 748
逻辑实现:
shouldBreakReadReady检查, 判断是否应该中断 读ready的处理,针对的情况可以参见其代码判断逻辑
调用子类实现的doReadBytes方法,读取byte的动作
流水线触发读事件 pipeline.fireChannelRead,fireChannelRead接受的参数是Object类型...,所以此处给的ByteBuf和上面NioMessageUnsafe read时给送Channel都可以的。
流水线触发读完成事件 pipeline.fireChannelReadComplete()
DefaultChannelHandlerContext 这个等到后面Handler相关分析的地方再继续分析
nio的select()的时候,只要数据通道允许写,每次select()返回的OP_WRITE都是true。所以在nio的写数据里面,我们在每次需要写数据之前把数据放到缓冲区,并且注册OP_WRITE,对selector进行wakeup(),这样这一轮select()发现有OP_WRITE之后,将缓冲区数据写入channel,清空缓冲区,并且反注册OP_WRITE,写数据完成。
protected final void clearOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); // 反注册?? } }
构造函数指定这个类感兴趣的I/O事件是OP_ACCEPT
这个类实现了doBind具体细节
对于不同的JDK版本用了不同的bind方法
JDK7以上(含) java.nio.channels.ServerSocketChannel.bind(SocketAddress, int)
JDK7以下 java.net.ServerSocket.bind(SocketAddress, int)
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
直接调用ServerSocketChannel的close方法,没有额外逻辑。
用NIO的ServerSocketChannel实例accept出NIO的SocketChannel实例,再用NIO的SocketChannel实例创建了netty对应的NioSocketChannel实例,并将此实例作为message返回到上下文处理逻辑中。(为了整个封装,messgae不一定指消息报文,此处就是指从ServerSocketChannel-->SocketChannel-->NioSocketChannel)
NioSocketChannel创建实例初始化时又做了很大一堆逻辑处理。
NioServerSocketChannel端不支持doConnect操作,不支持doFinishConnect操作,不支持doDisconnect操作,不支持doWriteMessage操作,不支持filterOutboundMessage操作。
AbstractNioUnsafe的实现逻辑,诸如connect等
io.netty.channel.nio.AbstractNioMessageChannel.doWrite(ChannelOutboundBuffer)实现逻辑待分析,在rest server这个项目中并没有用到这个逻辑。
ServerSocketChannel 这个类1.4就有了,但是它的bind方法在1.7才提供,详细参见其注释。但是JDK1.7为啥要新增这个方法呢?为了解决啥问题?// TODO