java并发专题-线程池介绍


今天介绍下java中的线程池应用,主要介绍java中的Excutor框架

线程池的好处

  • 提高响应速度,处理任务时无需等待线程创建;
  • 提高线程管理性,对线程池内线程统一分配、调度、管理;
  • 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;

线程池的处理流程

  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程
  • 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行

ThreadPoolExecutor详解

  • ThreadPoolExecutor:
    new ThreadPoolExecutor(int corePoolSize,
                    int maximumPoolSize,
                    long keepAliveTime,
                    TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory,
                    RejectedExecutionHandler handler)
    
    • corePoolSize: 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池corePoolSize大小时就不再创建;
    • maximumPoolSize: 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果;
    • keepAliveTime:线程最大空闲时间
    • TimeUnit:时间单位
    • workQueue:用于保存等待执行的任务队列;(下方详细介绍)
    • threadFactory: 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字;
    • handler:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务(下方详细介绍)
  • FixedThreadPool
       new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    
    • FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads
    • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
    • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
    • 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行
    • FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE),所以maximumPoolSize是一个无效参数,线程池的线程数量不超过corePoolSize,有oom风险
  • SingleThreadExecutor
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    
    • SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1
    • 其他参数与FixedThreadPool相同
  • CachedThreadPool
    new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
    
    • CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的
    • keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止
    • CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程(极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源)
  • 线程等待队列
    • 直接提交: 工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
    • 无界队列: 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性
    • 有界队列:当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量
  • 决绝策略
    • AbortPolicy:直接抛出异常;
    • CallerRunsPolicy:只用调用者所在线程来运行任务;
    • DiscardPolicy:不处理,丢弃掉;
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务;

ScheduledThreadPoolExecutor介绍

  • ScheduledThreadPoolExecutor运动介绍
    • 使用DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在Scheduled-ThreadPoolExecutor中没有意义;
    • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith-FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask
    • 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务
  • ScheduledThreadPoolExecutor实现
    • DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled-FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面
    • 线程执行周期任务
      • 线程从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue. take())。到期任务是指ScheduledFutureTask的time大于等于当前时间
      • 线程执行这个ScheduledFutureTask
      • 线程修改ScheduledFutureTask的time变量为下次将要被执行的时间
      • 线程把这个修改time之后的ScheduledFutureTask放回DelayQueue中(Delay-Queue.add())
    • 获取任务
      • 获取Lock
      • 获取周期任务
      • 释放Lock
    • 添加任务 类似获取任务

特别提醒

  • 使用new ThreadPoolExecutor自行创建线程池,更好的管理线程数,队列,避免使用中存在无界问题导致oom;
  • 线程池消耗一定的系统资源,结合实际避免创建过多线程池,如创建static 线程池

文章作者: Xudong Jiang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Xudong Jiang !
 上一篇
java虚拟机内存区域介绍 java虚拟机内存区域介绍
本文将简单介绍下java内存区域,借此加深对java虚拟机的理解;希望对入门者有一定的引导作用,同时也作为记录帮助自己记忆,若有错误,希望大牛指点一二; 简述java虚拟机所管理的内存包括这几个运行时数据区:程序计数器、java虚拟机栈、本
2019-12-22
下一篇 
记一次线上问题-java内存溢出 记一次线上问题-java内存溢出
这里分享一个线上问题案例及解决过程。小k来到新公司不久,一天早上,小K收到一封报警邮件,提示某项目出现较多异常。小k点击邮件查看详情,发现提示out of menery错误。此时同事们还没来,小k暗自吐槽:“怎么都没来,这个项目代码自己也还
2019-11-08
  目录