今天介绍下java中的线程池应用,主要介绍java中的Excutor框架
线程池的好处
- 提高响应速度,处理任务时无需等待线程创建;
- 提高线程管理性,对线程池内线程统一分配、调度、管理;
- 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
线程池的处理流程
- 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程
- 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程
- 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行
ThreadPoolExecutor详解
- ThreadPoolExecutor:
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize: 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池corePoolSize大小时就不再创建;
- maximumPoolSize: 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果;
- keepAliveTime:线程最大空闲时间
- TimeUnit:时间单位
- workQueue:用于保存等待执行的任务队列;(下方详细介绍)
- threadFactory: 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字;
- handler:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务(下方详细介绍)
- FixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads
- 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
- 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
- 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行
- FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE),所以maximumPoolSize是一个无效参数,线程池的线程数量不超过corePoolSize,有oom风险
- SingleThreadExecutor
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1
- 其他参数与FixedThreadPool相同
- CachedThreadPool
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
- CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的
- keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止
- CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程(极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源)
- 线程等待队列
- 直接提交: 工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;
- 无界队列: 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性
- 有界队列:当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量
- 决绝策略
- AbortPolicy:直接抛出异常;
- CallerRunsPolicy:只用调用者所在线程来运行任务;
- DiscardPolicy:不处理,丢弃掉;
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务;
ScheduledThreadPoolExecutor介绍
- ScheduledThreadPoolExecutor运动介绍
- 使用DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在Scheduled-ThreadPoolExecutor中没有意义;
- 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith-FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask
- 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务
- ScheduledThreadPoolExecutor实现
- DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled-FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面
- 线程执行周期任务
- 线程从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue. take())。到期任务是指ScheduledFutureTask的time大于等于当前时间
- 线程执行这个ScheduledFutureTask
- 线程修改ScheduledFutureTask的time变量为下次将要被执行的时间
- 线程把这个修改time之后的ScheduledFutureTask放回DelayQueue中(Delay-Queue.add())
- 获取任务
- 获取Lock
- 获取周期任务
- 释放Lock
- 添加任务 类似获取任务
特别提醒
- 使用new ThreadPoolExecutor自行创建线程池,更好的管理线程数,队列,避免使用中存在无界问题导致oom;
- 线程池消耗一定的系统资源,结合实际避免创建过多线程池,如创建static 线程池