几种线程池的实现算法分析

阅读数:26759 2014 年 7 月 22 日

话题:语言 & 开发架构算法

在阅读研究线程池的源码之前,一直感觉线程池是一个框架中最高深的技术。研究后才发现,线程池的实现是如此精巧。本文从技术角度分析了线程池的本质原理和组成,同时分析了 JDK、Jetty6、Jetty8、Tomcat 的源码实现,对于想了解线程池本质、更好的使用线程池或者定制实现自己的线程池的业务场景具有一定指导意义。

  • 工作者 workers 与待处理工作队列实现方式举例:

    实现

    工作者 workers结构与并发保护

    待处理工作队列结构

    JDK

    使用了 HashSet 来存储工作者 workers,通过可重入锁 ReentrantLock 对其进行并发保护。每个 worker 都是一个 Runnable 接口。

    使用了实现接口 BlockingQueue 的阻塞队列来存储待处理工作 job,并把队列作为构造函数参数,从而实现业务可以灵活的扩展定制线程池的队列。业务也可使用 JDK 自身的同步阻塞队列 SynchronousQueue、有界队列 ArrayBlockingQueue、无界队列 LinkedBlockingQueue、优先级队列 PriorityBlockingQueue。

    Jetty6

    同样使用了 HashSet 存储工作者 workers,通过 synchronized 一个对象进行 HashSet 的并发保护。每个工作者实际上是一个 Thread 的扩展。

    使用了数组存储待处理的 job 对象 Runnable。数组初始化容量为 _maxThreads 个,使用变量 _queued 计算保存当前内部待处理 job 的个数即数组 length。超过数组最大值时,扩大 _maxThreads 个容量,因此数组永远够用够大,容量无界。同样是用 synchronized 一个对象的方式实现同步。

    Jetty8

    使用了 ConcurrentLinkedQueue 存储工作者 workers,利用 JDK 基于 CAS 算法的实现提高了并发效率,同时也降低了线程池并发保护的复杂程度。针对队列 ConcurrentLinkedQueue 无法保证 size() 实时性问题引入原子变量 AtomicInteger 统计工作者数量。

    与 JDK 相同实现,使用了基于接口 BlockingQueue 的阻塞队列来存储待处理工作 job,也支持在线程池构造函数的参数中传入队列类型。同时,Jetty8 内部默认未设置队列类型场景可自动设置使用 2 种队列:有界无法扩容的 ArrayBlockingQueue 及 Jetty 自身定制扩展实现的可扩容队列 BlockingArrayQueue。

    Tomcat

    基于 JDK 的 ThreadPoolExecutors 实现,复用 JDK 业务

    复用 JDK 业务

  • 线程池初始化与处理业务 job 算法举例:

    实现

    线程池构造与工作者初始化

    处理业务 job的算法

    JDK

    1. 基于多个构造参数实现灵活初始化,几个核心参数如下:

    corePoolSize:核心工作者数

    maximumPoolSize:最大工作者数

    keepAliveTime:超过核心工作者数时闲置工作者的存活时间。

    workQueue:待处理 job 队列,即前面提到的 BlockingQueue 接口。

    2. 默认初始化后不启动工作者,等待有请求时才启动。可以通过调用线程池接口提前启动核心工作数个工作者线程,也可以启动业务期望的多个工作者线程。

    1. 工作者 workers 数量低于核心工作者数 corePoolSize 时会优先创建一个工作者 worker 处理 job,处理成功则返回。

    2. 工作者 workers 数量高于核心工作者数时会优先把 job 放入到待处理队列,放入队列成功时处理结束。

    3. 步骤 2 中入队失败会识别工作者数是否还小于最大工作者数 maximumPoolsize,小于的话也会新创建一个工作者 worker 处理 job。

    4. 拒绝处理

    Jetty6

    1. 同样支持设置多个参数:

    _spawnOrShrinkAt:扩容 / 缩容阀值

    _minThreads:最小工作者数

    _maxThreads:最大工作者数

    _maxIdleTimeMs:闲置工作者最大闲置超时时间

    2. 初始化后直接启动 _minThreads 个工作者线程

    1. 查找闲置的工作者 worker,找到则派发 job。

    2. 没有闲置的工作者,将 job 存入待处理数组。

    3. 当识别到数组中待处理 job 超过扩容阀值参数时,扩容增加工作者处理 job

    4. 否则不处理

    Jetty8

    1. 配置参数类似 Jetty6,去除了 _spawnOrShrinkAt 阀值参数。

    2. 初始化后直接启动 _minThreads 个工作者线程

    非常简单,直接将待处理 job 入队。

    Tomcat

    1. 基于 JDK 线程池的构造方法

    2. 来请求时启动工作者

    处理方法复用 JDK 的,但是在开始提交前扩展了 JDK 的功能,实现了可以统计提交数 submittedCount 的能力

  • 线程池工作者 worker 的增减机制举例:

    实现

    工作者增加算法

    工作者减少算法

    JDK

    1. 待处理 job 来时,工作者 workers 数量低于核心工作者数 corePoolSize 时。

    2. 待处理 job 来时,workers 数超过核心数小于最大工作者数且入待处理队列失败场景。

    3. 业务调用线程池的更新核心工作者数接口时,若发现扩容,会增加工作者数。

    1. 待处理任务队列里没有 job 并且工作者 workers 数量超过了核心工作者数 corePoolSize。

    2. 待处理任务队列里没有 job 并且允许工作者数量小于核心工作者参数为 true,此场景会至少保留一个工作者线程。

    Jetty6

    1. 启动线程池时会启动 _minThreads 个工作者线程

    2. 待处理的 job 数量高于了阀值参数且工作者数没有达到最大值时会增加工作者。

    3. 调用线程池接口 setMinThreads 更新最小工作者数时会根据需要增加工作者。

    如下三个条件同时满足时会减少工作者:

    1. 待处理任务数组中没有待处理 job

    2. 工作者 workers 数量超过了最小工作者数 _minThreads

    3. 闲置工作者线程数高于了阀值参数

    Jetty8

    1. 启动线程池时启动最小工作者参数个工作者线程

    2. 已经没有闲置工作者或者闲置工作者的数量已经小于待处理的 job 的总数

    3. 调用线程池接口 setMinThreads 更新最小工作者数时

    如下三个条件同时满足时会减少工作者:

    1. 待处理任务队列里没有待处理的 job

    2. 工作者 workers 总数超过了最小工作者参数配置 _minThreads

    3. 工作者线程的闲置时间超时

    Tomcat

    同 JDK 增加工作者算法

    复用 JDK 减少算法,同时定制扩展延迟参数,超过参数时,直接抛出异常到外面来终止线程池工作者。

  • 对比几种线程池实现,JDK 的实现是最为灵活、功能最强且扩展性最好的,Tomcat 即基于 JDK 线程池功能扩展实现,复用原有业务的同时扩充了自己的业务。Jetty6 是完全自己定制的线程池业务,耦合线程池众多复杂的业务逻辑到线程池类里面,逻辑相对最为复杂,扩展性也非常差。Jetty8 相对 Jetty6 的实现简化了很多,其中利用了 JDK 中的同步容器和原子变量,同时实现方式也越来越接近 JDK。