• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

Netty源码分析(5):服务器启动过程

Netty winrains 来源:史圣杰 12个月前 (11-09) 47次浏览

服务器的启动过程大量使用了EventLoop和Future/Promise,在阅读源码之前,建议首先要对Netty的这两种机制进行了解。由于Netty更多是在服务器端使用,因此以服务器的启动过程为例进行学习。

5.1 阶段:配置config

配置阶段的工作很简单,主要就是初始化启动类,设置相关参数。
Bootstrap启动类主要功能是初始化启动器,为启动器设置相关属性。我们先来看一下Bootstrap的类结构,启动类有一个AbstractBootstrap基类,有两个实现类Bootstrap和ServerBootstrap,分别用于客户端和服务器的启动。

AbstractBootstrap

属性

EventLoopGroup group; //线程组,对于ServerBootstrap来说,group为ServerSocketChannel服务
ChannelFactory<? extends C> channelFactory; //用于获取channel的工厂类
SocketAddress localAddress;//绑定的地址
Map<ChannelOption<?>, Object> options;//channel可设置的选项,包含java-channel和netty-channel
Map<AttributeKey<?>, Object> attrs;//channel属性,便于保存用户自定义数据
ChannelHandler handler;//Channel处理器

方法

group() //设置线程组
channelFactory()及channel() 设置channel工厂和channel类型
localAddress() 设置地址
option() 添加channel选项
attr() 添加属性
handler() 设置channelHander

上面这些方法主要用于设置启动器的相关参数,除此之外,还有一些启动时调用的方法

register() 内部调用initAndRegister() 用来初始化channel并注册到线程组
bind() 首先会调用initAndRegister(),之后绑定IP地址,使用Promise保证先initAndRegister()在bind()
initAndRegister(),主要是创建netty的channel,设置options和attrs,注册到线程组

ServerBootstrap

ServerBootstrap在AbstractBootstrap的基础上添加了如下属性,用来设置子Channel,也就是客户端连接后创建的Channel的属性。另外,还实现了抽象类中定义的init()方法。

 Map<ChannelOption<?>, Object> childOptions  子channel的配置
 Map<AttributeKey<?>, Object> childAttrs  子channel的属性
EventLoopGroup childGroup;    处理子channel的事件循环组
ChannelHandler childHandler;  处理子channel事件的handler

5.2 阶段:初始化init

初始化init阶段的主要功能是:创建并初始化服务器的Netty-Channel;分为两个步骤:创建和初始化。
创建NettyChannel

  • 使用SelectorProvider打开java通道
  • 为Channel分配全局唯一的ChannelID
  • 创建NioMessageUnsafe,用于netty底层的读写操作
  • 创建ChannelPipeline,默认的是DefaultChannelPipeline

下面是初始init阶段的主要代码:

Channel channel = null;
try {
    channel = channelFactory.newChannel();// 创建NettyChannel
    init(channel);//初始化NettyChannel
} catch (Throwable t) {
    if (channel != null) {
        // channel can be null if newChannel crashed (eg SocketException("too many open files"))
        channel.unsafe().closeForcibly();
    }
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

channelFactory用于获取Channel实例,启动时,channelFactory在调用channel(NioServerSocketChannel.class)设置channel类型时创建,由于我们使用的是设置class的方法,会使用ReflectiveChannelFactory作为工厂类,其会直接调用class的newInstance获取Channel实例。Netty中,服务器端的Channel为NioServerSocketChannel,客户端为NioSocketChannel。
Channel的创建过程如下:

  1. 打开java通道:NioServerSocketChannel创建时,首先使用SelectorProvider的openServerSocketChannel打开服务器套接字通道。SelectorProvider是Java的NIO提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的Provider:如果LInux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider。至此,底层的Java ServerSocketChannel创建完毕。
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}
  1. Java ServerSocketChannel创建完毕后,会进入netty-Channel的构造方法,首先初始化ChannelId,ChannelId是一个全局唯一的值;
  2. 之后,创建NioMessageUnsafe实例,该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等;
  3. 为Channel创建DefaultChannelPipeline,初始化双向链表;
  4. 讲java-channel设置为非阻塞,将关注的操作设置为SelectionKey.OP_ACCEPT(服务器)
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // 初始化双向链表
    tail = new TailContext(this); // 创建head
    head = new HeadContext(this); // 创建tail
    head.next = tail;
    tail.prev = head;
}

初始化NettyChannel
创建NettyChannel后,下一步需要进行初始化,由于服务器端和客户端的Channel不一样,因此init方法被分别实现到了ServerBootstrap和Bootstrap中,我们主要分析服务器的init。服务器的init分为几个步骤:

  • 将启动器设置的选项和属性设置到NettyChannel上面
  • 向Pipeline添加初始化Handler,供注册后使用

具体实现在ServerBootstrap类的init方法中,程序比较简单。每个NettyChannel对象保护一个ChannelConfig类保存相关配置,还有Map<AttributeKey<?>, Object> attrs用来保存自定义属性。至于初始化Handler,我们先记住,在bind中会说明其作用。
在addLast时,由于还未注册,因此会加入到Pipeline的一个等待链表中,待注册后执行。

if (!registered) {
    newCtx.setAddPending();
    callHandlerCallbackLater(newCtx, true);
    return this;
}

总结一下,这个阶段的代码我们可以看出,channel内部包含几个重要对象:

ChannelID 全局唯一ID
ChannelConfig  保存配置
ChannelPipeline  通道的流水线
Unsafe  Netty底层封装的网络I/O操作

5.3 阶段:注册register

这个阶段的主要工作是将创建并初始化后的NettyChannel注册到selector上面。具体过程:

  • 将打开NettyChannel注册到线程池组的selector上;
  • 触发Pipeline上面ChannelHandler的channelRegistered,
// AbstractBootstrap类 initAndRegister()
 ChannelFuture regFuture = config().group().register(channel);

上面的程序会使用传入的线程池组的register(channel);注册NettyChannel,具体方法定义在SingleThreadEventLoop中,其会使用NettyChannel的unsafe的register方法,该方法首先会判断当前线程是否是指定线程池正在运行的线程,如果不是提交到要注册的线程池中执行。执行时调用下面的程序。

// AbstractUnsafe,删去了部分校验代码
private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;// 是否为首次注册
        doRegister(); // 1. 注册
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded();// 2. 将注册之前加入的handler加入进来
        safeSetSuccess(promise); // 注册成功,通知promise
        pipeline.fireChannelRegistered();// 4. Pipeline通知触发注册成功
        if (isActive()) { // 是否已经绑定 因为register和bind阶段是异步的
            if (firstRegistration) {
                pipeline.fireChannelActive(); // 5.首次注册,通知
            } else if (config().isAutoRead()) {// Channel会deregister后重新注册到线程组时,且配置了AutoRead
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
  1. 注册:将NettyChannel内部的javaChannel注册到线程池的selector上面,由线程池不断执行select()查询准备就绪的文件描述符。具体实现在AbstractNioChannel中的doRegister()
  2. invokeHandlerAddedIfNeeded: 注册成功后,找到初始化阶段通过pipeline.addLast()加入的ChannelInitializer,执行其ChannelInitializer的initChannel方法,之后将其删除(在ChannelInitializer的initChannel方法中);初始化NettyChannel阶段,我们addLast了一个初始化Handler,现在来看看其作用
// init初始化阶段添加了一个ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
   @Override
   public void initChannel(Channel ch) throws Exception {
       final ChannelPipeline pipeline = ch.pipeline();
       ChannelHandler handler = config.handler();// 获取config时设置的handler
       if (handler != null) {
           pipeline.addLast(handler); // 将其添加到链表尾部
       }
        // 加入一个ServerBootstrapAcceptor处理器,用于处理Accept
           ch.eventLoop().execute(new Runnable() {
               @Override
               public void run() {
                   pipeline.addLast(new ServerBootstrapAcceptor(
                           currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
               }
           });
       }
   });

从上面的程序中可以看到,由于初始化时,还未将javaChannel注册到线程池的selector上,此时还无法设置Channel将Accept注册到选择器上,因此先加入了一个ChannelInitializer,等待register后向Pipeline加入ServerBootstrapAcceptor。此时,NettyChannel的Pipeline的链表结构为:

Head <--> InitialHandler <--> ServerBootstrapAcceptor <--> Tail

在initChannel执行的最后会将InitialHandler从Pipeline移除,此时NioServerSocketChannel的链表结构为

Head <--> ServerBootstrapAcceptor <--> Tail
  1. fireChannelRegistered,沿着pipeline的head到tail,调用ChannelHandler的channelRegistered方法,
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
private void invokeChannelRegistered() {
    if (invokeHandler()) { // 状态是否正确
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this); // 触发
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();// 状态不正确,通知下一个Handler
    }
}
  1. fireChannelActive 由于注册阶段和绑定bind阶段都是异步的,如果此时注册完成时bind阶段已经绑定了本地端口,会沿着pipeline的head到tail,调用各个Handler的channelActive方法

5.4 阶段:绑定bind

本阶段的主要内容是:将NettyChannel内部的java的ServerSocketChannel绑定到本地的端口上面,结束后使用fireChannelActive通知Pipeline里的ChannelHandle,执行其channelActive方法。
bind的入口为AbstractBootstrap的doBind0(),内部会调用pipeline中的bind方法,逻辑为从tail出发,调用outbound的ChannelHandler的bind方法,从上面我们可以看到当前的链表如下:

Head[I/O] <--> ServerBootstrapAcceptor[IN] <--> Tail[IN]

只有Head可以用来处理Outbound,Head的bind方法调用了channel创建过程中生成的unsafe对象NioMessageUnsafe的实例,该实例的bind方法首先java的channel bind本地地址,然后触发fireChannelActive。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

至此,Netty的服务器段已经启动,Channel和ChannelPipeline已经建立。EventLoop也在不断的select()查找准备好的I/O。

作者:史圣杰

来源:https://www.jianshu.com/p/ef6b794a2876


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)