• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

Netty源码分析(3):线程池

Netty winrains 来源:史圣杰 12个月前 (11-09) 41次浏览

线程池是一个在多线程场景中运用很广泛的并发框架,需要异步执行或并发执行任务的程序都可以使用线程池。有任务到来时,如果不使用线程池,我们需要不断的创建/销毁线程,还需要对线程进行管理;而使用线程池,直接将任务提交到线程池即可。使用线程池有几个好处:无需重复创建/销毁线程,降低资源消耗;提高程序响应速度;提高线程的可管理性。

3.1 实现原理

线程池内部一般包含一个核心线程池,其内部的线程在创建之后一般不会销毁,执行完任务后线程会阻塞等待新任务到来。
当向线程池提交任务时,线程池会做如下判断:

  • 核心线程池未满,创建线程执行任务
  • 核心线程池已满,若等待队列未满,则加入到等待队列;若等待队列已满但线程池未满,创建新线程执行任务;若等待队列和线程池均已满,则按照指定策略退出/拒绝任务/丢弃任务等。
线程池执行流程

了解了实现原理,我们先来自己实现一个线程池,首先定义线程池的接口

ThreadPool

线程池的接口里面最重要的方法是execute执行任务

public interface ThreadPool<Job extends Runnable> {
    //提交一个Job,这个Job需要实现Runnable接口
    void execute(Job job);
    //关闭线程池
    void shutdown();
    //增加工作者线程
    void addWorkers(int num);
    //减少工作者线程
    void removeWorker(int num);
    //得到正在等待执行的任务数量
    int getJobSize();
}

CommonThreadPool

在实现线程池时,我们需要定义线程池的大小,以及保存任务的列表jobs,下面是变量定义:

// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 100;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 1;
// 线程池最小数量
private static final int MIN_WORKER_NUMBERS = 1;
// 工作列表
private final LinkedList<Job> jobs = new LinkedList<Job>();

在线程池初始化时,我们要将核心线程池进行初始化,创建多个Worker线程,然后启动Worker线程。

// num 为DEFAULT_WORKER_NUMBERS 默认线程池大小
private void initializeWokers(int num) {
    // 创建多个线程,加入workers中,并启动
    for (int i = 0; i < num; i++) {
        Worker worker = new Worker();
        workers.add(worker);
        Thread thread = new Thread(worker, "ThreadPool-Worker-"
                + threadNum.getAndIncrement());
        thread.start();
    }
}

Worker启动后,一直没有任务,需要阻塞在jobs上(jobs是上面定义的任务列表),Worker等待任务到来后唤醒获取队列中的任务并执行。下面的代码中,如果jobs为空,则线程等待;

// worker的代码,首先要获取jobs的锁,
synchronized (jobs) {
    while (jobs.isEmpty()) {// 如果jobs是空的,则执行jobs.wait,使用while而不是if,因为wait后可能已经为空了,需要继续等待
        try {
            jobs.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();// 中断
            return;// 结束
        }
    }
    job = jobs.removeFirst();// 第一个job
    if (job != null) {
        try {
            job.run();//注意,这里是run而不是start,传入的Job
        } catch (Exception e) {
            // 忽略Job执行中的Exception
            e.printStackTrace();
        }
    }
}

提交任务时,只需要将任务加入jobs中,然后通知worker线程即可。worker线程获得锁后会取第一个任务执行。执行完毕,若jobs为空,worker线程继续进行休眠等待任务到来。

@Override
public void execute(Job job) {
    if (job == null)
        return;
    synchronized (jobs) {
        jobs.addLast(job);
        jobs.notify();
    }
}

完整的代码可以查看https://github.com/ssj234/JavaStudy_IO/tree/master/IOResearch/src/net/ssj/pool

3.2 Java的Executor框架

Java平台本身提供了Executor框架用来帮助我们使用线程池。

Executor框架

Executor框架最核心的类是ThreadPoolExecutor,这是各个线程池的实现类,有如下几个属性:

  • corePool:核心线程池的大小 m
  • maximumPool:最大线程池的大小
  • keepAliveTime: 休眠等待时间
  • TimeUnit unit : 休眠等待时间单位,如微秒/纳秒等
  • BlockingQueue workQueue:用来保存任务的工作队列
  • ThreadFactory: 创建线程的工厂
  • RejectedExecutionHandler:当线程池已经关闭或线程池Executor已经饱和,execute()方法将要调用的Handler

通过Executor框架的根据类Executors,可以创建三种基本的线程池:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池。

// 获取fixedThreadPool
ExecutorService fixedThreadPool=Executors.newFixedThreadPool(paramInt);
//内部会调用下面的方法,参数 corePoolSize、maximumPoolSize、keepAliveTime、workQueue
return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

FixedTheadPool设置的线程池大小和最大数量一样;keepAliveTime为0,代表多余的空闲线程会立刻终止;保存任务的队列使用LinkedBlockingQueue,当线程池中的线程执行完任务后,会循环反复从队列中获取任务来执行。
FixedThreadPool适用于限制当前线程数量的应用场景,适用于负载比较重的服务器。

SingleThreadExecutor

SingleThreadExecutor的核心线程池数量corePoolSize和最大数量maximumPoolSize都设置为1,适用于需要保证顺序执行的场景

ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,适用于短期异步的小任务,或负载教轻的服务器。

ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());

SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。corePoolSize是0,maximumPoolSize都最大,无界的。keepAliveTime为60秒,空闲线程超过60S会被终止。

ScheduleThreadPoolExecutor

ScheduleThreadPoolExecutor和Timer类似,可以设置延时执行或周期执行,但比Timer有更多的功能。Timer和TimerTask只创建一个线程,任务执行时间超过周期会产生一些问题。Timer创建的线程没有处理异常,因此一旦抛出非受检异常,会立刻终止。

ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(5);
//可以直接执行
executor.execute(new JobTaskR("executor", 0));
executor.execute(new JobTaskR("executor", 1));
System.out.println("5S后执行executor3");
//隔5秒后执行一次,但只会执行一次。
executor.schedule(new JobTaskR("executor", 3), 5, TimeUnit.SECONDS);
System.out.println("开始周期调度");
//设置周期执行,初始时6S后执行,之后每2s执行一次
executor.scheduleAtFixedRate(new JobTaskR("executor", 4), 6, 2, TimeUnit.SECONDS);

scheduleAtFixedRate或者scheduleWithFixedDelay方法,它们不同的是前者以固定频率执行,后者以相对固定延迟之后执行。

3.3 Netty的EventLoop与线程池

Netty的事件循环和事件循环组的实现中,类的层级关系比较复杂,其底层是Java线程池的实现,不过在netty的实际使用中还是比较简单的,我们只需要使用如下的代码即可,

EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)//设置事件循环组

Netty的事件循环机制有两个基本接口:EventLoop和EventLoopGroup。前者是事件循环,后者是由多个事件循环组成的组。
EventLoop自身是一个不断循环执行的线程,以NioEventLoop为例,其继承了SingleThreadEventExecutor,内部的executor是创建NioEventLoop时传入的线程池,用来将run方法放入线程池中执行;此外还包含为一个TaskQueue,netty在处理io过程中的task可以提交到这个队列中,事件循环会不断获取task并执行,因此但其本身也可以看做一个线程池。
NioEventLoop的run方法中,Nio的事件循环会不断select后获取任务并执行,然后根据ioRatio的设置执行TaskQueue的任务。NioEventLoop的execute方法中,其会将task加入到taskQueue等待事件循环执行。因此,我们可以将NioEventLoop当做一个不断执行的线程池,EventLoopGroup作为线程池组,线程池组的意义是采用给的的策略选取一个EventLoop并提交任务。
EventLoop的定义如下,其继承了一个顺序执行的线程池接口和EventLoopGroup,也就是说EventLoop之间有父子关系,通过parent();返回任务循环组,通过next()选取一个事件循环。线程池组的register用于将Netty的Channel注册到事件循环中。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {
    EventLoop next();
    ChannelFuture register(Channel channel);
}

NioEventLoopGroup

NioEventLoopGroup除了处理网络的异步I/O任务,还用于完成异步提交的系统任务。NioEventLoopGroup初始化时,有如下几个参数可以配置,主要用于设置线程池的相关配置。

  • nThreads 子线程池数量
  • Executor executor 用来执行事件循环的线程池
  • chooserFactory :next()时选择线程池的策略
  • selectorProvider 用于打开selector
  • selectStrategyFactory 用来控制select循环行为的策略
  • RejectedExecutionHandlers 线程池执行的异常处理策略
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            RejectedExecutionHandlers.reject());
}

NioEventLoopGroup初始化过程为:

  1. 如果传入的executor 为空,会默认使用ThreadPerTaskExecutor,该线程池针对每个任务会创建一个线程,创建线程方式使用DefaultThreadFactory提供的newThread方法。
  2. 初始化开始,首先会根据创建nThread个子线程池,保存在childrens变量中,创建逻辑比较简单,将初始化NioEventLoopGroup时设置的参数传递给NioEventLoop对象。在创建子线程池NioEventLoop的过程中,如果一旦有失败的,就需要关闭已经创建的所有子线程池并等待这些线程池结束。
  3. 之后,使用chooserFactory创建chooser,用来在next()选择事件循环时从childrens变量选择一个返回。默认使用2的倍数的策略,也可以设置为顺序依次选择。
  4. 向组中所有的事件循环的terminationFuture注册事件,目的是等待所有事件循环结束后将事件循环组的terminatedChildren设置为成功完成。
  5. 最后,将children复制保存为一个只读的集合,保存在变量readonlyChildren中。

至此,NioEventLoopGroup的初始化过程就结束了。我们可以看到,NioEventLoopGroup主要的用来聚合多个EventLoop,对其进行调度。

NioEventLoop

在NioEventLoopGroup的初始化过程中,会创建多个NioEventLoop,NioEventLoop用来执行实际的事件循环,初始化时有如下几个属性:

  • NioEventLoopGroup parent 线程池所在的Group
  • Executor executor 执行任务的线程池,默认是ThreadPerTaskExecutor
  • SelectorProvider selectorProvider 用来打开selector
  • SelectStrategy strategy 用来控制select循环行为的策略
  • RejectedExecutionHandlers 线程池执行的异常处理策略
  • addTaskWakesUp addTask(Runnable)添加任务时是否唤醒线程池,默认是false
  • maxPendingTasks 线程池中等待任务的最大数量
  • scheduledTaskQueue 保存定时任务的QUeue
  • tailTasks :保存任务的Queue,netty选择使用jctools的MpscChunkedArrayQueue,原因是为了提高效率,因为Nio线程池的线程消费者只有一个,就是一直进行的select循环,而生产者可能有多个。具体实现参见 http://blog.csdn.net/youaremoon/article/details/50351929

事件循环

  • NioEventLoop初始化时,会根据配置参数sun.nio.ch.bugLevelio.netty.selectorAutoRebuildThreshold设置重建selector的阈值,这是为了解决jvm空轮询导致cpu利用率100%的问题。
  • openSelector的目的是打开选择描述符Selector,并对sun.nio.ch.SelectorImpl的实现进行优化,将selectedKeys和publicSelectedKeys属性都修改为SelectedSelectionKeySet类,这个类使用了两个数组,使用空间换时间的方法,设置了两个数组,每次使用其中的一个。
  • 打开Selector之后,在服务器启动后会调用register将选择描述符注册到EventLoopGroup,NioEventLoopGroup中会调用NioEventLoop的register,这样,事件循环中的Selector就注册到了channel上。
  • 在run方法中,会根据selectStrategy调用select方法,收到io事件后使用processSelectedKeys处理,处理完成后执行TaskQueue中的方法。

提交任务

NioEventLoop初始化时,会创建/设置其包含的属性,最重要的是打开selector和创建tailTasks两个步骤;这时,由于没有任何任务,NioEventLoop不会启动线程。在netty中,向线程池提交任务可以使用下面的方法:

EventLoopGroup loop = new NioEventLoopGroup();
loop.next().submit(Callable<T> task)
loop.next().submit(Runnable task)
loop.next().execute(Runnable command);

也可以直接通过EventLoopGroup提交任务,只是EventLoopGroup内部会调用next()后再执行相关的方法。

EventLoopGroup loop = new NioEventLoopGroup();
loop.submit(Callable<T> task)
loop.submit(Runnable task)
loop.execute(Runnable command);

submit方法的内部会将Callable或Runnable包装后交给execute方法执行。

// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
    if (task == null)
        throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task); // 包装task为 ftask
    execute(ftask);
    return ftask;
}

execute方法被NioEventLoop的父类SingleThreadEventExecutor覆盖,程序如下:

public void execute(Runnable task) {
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task); // 添加到任务队列
    } else {
        startThread(); // 启动线程,向EventLoop内部的线程池提交任务,会执行NioEventLoop run
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
  1. 判断当前线程(提交任务的线程)与当前线程池是同一个线程,也就是说是如果是当前线程池提交的任务,则直接将任务加入线程池队列即可;
  2. 如果不是,则需要启动线程后添加任务。启动线程的过程是,如果内部线程没有启动则启动,向NioEventLoop内部包含的executor提交一个任务,任务内部执行NioEventLoop的run方法也就是事件循环(executor是实际使用的线程池,初始化是传入,默认是ThreadPerTaskExecutor)。
  3. 最后根据addTaskWakesUp标志和任务是否实现了NonWakeupRunnable判断是否需要唤醒,唤醒的方法是提交一个默认的空任务WAKEUP_TASK。

3.4 事件循环解析

Nio事件循环在NioEventLoop中,主要功能:

  • 处理网络I/O读写事件
  • 执行系统任务和定时任务

在主循环中我们可以看到netty对I/O任务和提交到事件循环中的系统任务的调度。

EventLoop事件循环

3.4.1 I/O事件

  1. 由于NIO的I/O读写需要使用选择符,因此,netty在NioEventLoop初始化时,会使用SelectorProvider打开selector。在类加载时,netty会从系统设置中读取相关配置参数:
  • sun.nio.ch.bugLevel 用来修复JDK的NIO在Selector.open()的一个BUG
  • io.netty.selectorAutoRebuildThreshold select()多少次数后重建selector
static {
    int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
    if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
        selectorAutoRebuildThreshold = 0;
    }
    SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
  1. NioEventLoop的构造方法中,会调用provider.openSelector()打开Selector;如果设置io.netty.noKeySetOptimization为true,则会启动优化,优化内容是将Selector的selectedKeys和publicSelectedKeys属性设置为可写并替换为Netty实现的集合以提供效率。
private Selector openSelector() {
    final Selector selector;
    try {
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }
   //  下面是优化程序,此处省略
   ...
    return selector;
}
  1. NioEventLoop最核心的地方在于事件循环,具体代码在NioEventLoop.java在run方法中
  • 首先根据默认的选择策略DefaultSelectStrategy判断本次循环是否select,具体逻辑为:如果当前有任务则使用selectNow立刻查询是否有准备就绪的I/O;如果当前没有任务则返回SelectStrategy.SELECT,并将wakenUp设置为false,并调用select()进行查询。
protected void run() {
    for (;;) {  // 事件循环
        try {
            // select策略
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));  // select()
                    if (wakenUp.get()) {
                        selector.wakeup(); // 唤醒select()的线程
                    }
                default:
                    // fallthrough
            }
        .... 后续处理
  • select()时需要判断当前是否有scheduledTask(定时任务),如果有则需要计算任务delay的时间,如果定时任务需要立刻执行了,那么必须马上selectNow()并返回,之后执行任务。如果没有scheduledTask,会判断当前是否有任务在等待列表,如果有任务时将wakenUp设置为true并selectNow();如果没有任务,那么会 selector.select(1000); 阻塞等待1s,直到有I/O就绪,或者有任务等待,或需要唤醒时退出,否则,会继续循环,直到前面的几种情况发生后退出。
  • 之后,事件循环开始处理IO和任务。如果查询到有IO事件,会调用processSelectedKeysOptimized(优化的情况下),对SelectionKey进行处理。
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        runAllTasks();
    }
} else {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        final long ioTime = System.nanoTime() - ioStartTime; // io花费的时间
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate计算task的时间
    }
}
  • processSelectedKeysOptimized处理I/O,主要是NIO的select操作,处理相关的事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
   ......
    try {
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

3.4.2 任务处理

  • runAllTasks执行提交到EventLoop的任务,首先从scheduledTaskQueue获取需要执行的任务,加入到taskQueue,然后依次执行taskQueue的任务。
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;
    do {
        fetchedAll = fetchFromScheduledTaskQueue(); // 获取定时任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}
  • ioRatio不为100时,会调用runAllTasks(ioTime * (100 – ioRatio) / ioRatio),首先计算出I/O处理的事件,然后按照比例为执行task分配事件,内部主要逻辑与runAllTasks()主要逻辑相同。

作者:史圣杰

来源:https://www.jianshu.com/p/3a770d58797f


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)