Netty服务端启动(一)——创建和初始化channel

以netty的一个小demo为例(使用的源码版本为4.1.50.Final)

b.bind(PORT)跟进

最终调用到AbstractBootstrap#doBind这个方法

private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();  final Channel channel = regFuture.channel();  if (regFuture.cause() != null) {  return regFuture;  }   if (regFuture.isDone()) {  // At this point we know that the registration was complete and successful.  ChannelPromise promise = channel.newPromise();  doBind0(regFuture, channel, localAddress, promise);  return promise;  } else {  // Registration future is almost always fulfilled already, but just in case it‘s not.  final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);  regFuture.addListener(new ChannelFutureListener() {  @Override  public void operationComplete(ChannelFuture future) throws Exception {  Throwable cause = future.cause();  if (cause != null) {  // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an  // IllegalStateException once we try to access the EventLoop of the Channel.  promise.setFailure(cause);  } else {  // Registration was successful, so set the correct executor to use.  // See https://github.com/netty/netty/issues/2586  promise.registered();   doBind0(regFuture, channel, localAddress, promise);  }  }  });  return promise;  } } 

这里就是服务端创建的主要代码

创建Channel

主要概括如下

1.通过demo里传入的NioServerSocketChannel类反射创建channel

2.在NioServerSocketChannel的构造函数里

  • 通过java底层的SelectorProvider创建ServerSocketChannel
  • 调用父类构造函数设置阻塞模式,创建id,unsafe,pipeline等
  • 创建NioServerSocketChannelConfig,便于进行参数配置

方法体里第一行代码的initAndRegister方法就是在创建和初始化channel

final ChannelFuture initAndRegister() { Channel channel = null;  try {  channel = channelFactory.newChannel(); //创建channel,本节分析  init(channel);  } catch (Throwable t) {  //省略  }   ChannelFuture regFuture = config().group().register(channel);  if (regFuture.cause() != null) {  if (channel.isRegistered()) {  channel.close();  } else {  channel.unsafe().closeForcibly();  }  }  return regFuture; } 

创建channel就在channelFactory.newChannel()这行代码里,这里会调用到ReflectiveChannelFactory#newChannel这个方法

public T newChannel() { try {  return constructor.newInstance();  } catch (Throwable t) {  //省略  } } 

可以看到这里用反射创建一个channel,也就是说channel是通过channelFactoryconstructor来反射创建的,而channelFactory是在demo里调用channel方法时初始化的,如下

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class) //在这里初始化了`channelFactory  //省略 

AbstractBootstrap#channel方法里

public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>(  ObjectUtil.checkNotNull(channelClass, "channelClass")  )); } 

最终调用到AbstractBootstrap#channelFactory方法里

public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory");  if (this.channelFactory != null) {  throw new IllegalStateException("channelFactory set already");  }   this.channelFactory = channelFactory;  return self(); } 

既然这里是通过反射创建了一个NioServerSocketChannel,接下来看看NioServerSocketChannel这个类的构造函数

public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } 

先看看NioServerSocketChannel#newSocket这个方法

private static ServerSocketChannel newSocket(SelectorProvider provider) { try {  return provider.openServerSocketChannel(); //provider就是DEFAULT_SELECTOR_PROVIDER,为SelectorProvider.provider()  } catch (IOException e) {  //省略  } } 

上面的构造函数调用到了另一个构造函数

public NioServerSocketChannel(ServerSocketChannel channel) { //注意这里传入SelectionKey.OP_ACCEPT,用于之后注册accept事件  super(null, channel, SelectionKey.OP_ACCEPT);  config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } 

super方法调用父类做一些简单的初始化,主要在AbstractNioChannelAbstractChannel这两个类这种

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent);  this.ch = ch;  this.readInterestOp = readInterestOp;  try {  ch.configureBlocking(false);  } catch (IOException e) {  //省略  } } 
protected AbstractChannel(Channel parent) { this.parent = parent;  id = newId();  unsafe = newUnsafe();  pipeline = newChannelPipeline(); } 

NioServerSocketChannelConfig则是传入java底层的ServerSocketChannel方便以后对做一些配置

初始化channel

先概括一下,主要做了这几件事

  • 将用户设置的options和attrs设置到channel的config里
  • 向channel的pipeline里添加用户传入的handler
  • 创建一个特殊的handler即ServerBootstrapAcceptor(用于处理新连接接入)并添加进pipeline里

接下来分析初始化channel的过程,即init(channel)这行代码

这里会调用到ServerBootstrap#init这个方法

void init(Channel channel) { //newOptionsArray这个方法拿到用户设置的options设置到channel的config里  setChannelOptions(channel, newOptionsArray(), logger);  //attrs0这个方法拿到用户设置的attrs设置到channel里  setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));   ChannelPipeline p = channel.pipeline();   final EventLoopGroup currentChildGroup = childGroup;  final ChannelHandler currentChildHandler = childHandler;  final Entry<ChannelOption<?>, Object>[] currentChildOptions;  //拿到用于配置childHandler的childOptions  synchronized (childOptions) {  currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);  }  //拿到用于配置childHandler的childAttrs  final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);   p.addLast(new ChannelInitializer<Channel>() {  @Override  public void initChannel(final Channel ch) {  final ChannelPipeline pipeline = ch.pipeline();  //拿到demo里调用childHandler方法时添加的handler  ChannelHandler handler = config.handler();  if (handler != null) {  pipeline.addLast(handler);  }   ch.eventLoop().execute(new Runnable() {  @Override  public void run() {  //ServerBootstrapAcceptor是一个特殊的handler,用于处理新连接接入  pipeline.addLast(new ServerBootstrapAcceptor(  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  }  });  }  }); } 

下篇 Netty服务端启动(二)——注册selector和端口绑定

相关文章