点击上方“互联网平头哥”,选择“关注公众号”
技术文章第一时间送达!
在生产环境,我们经常面临的情况是:处理某次请求的时间非常短暂,但是请求量很大。
在这种情况下,如果为每个请求单独创建一个线程,有限的硬件资源有可能会被OS创建线程,切换线程状态、销毁线程这些操作所占用,用于业务处理的资源反而减少了。
所以理想的处理方式是:将请求的线程数量控制在一个范围内,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。
开发者通常利用 ThreadPoolExecutor 类的构造函数来创建不同配置的线程池:
ThreadPoolExecutor(int corePoolSize,
int maximußmPoolSize,
long keepAliveTime,
TimeUnit unit,
workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
透过这个构造方法,可以大致勾勒出线程池的基本组成,其大致的结构如下图所示。
这里一定要注意:存在于线程池中的一定是Thread对象,而不是你要处理的任务。因此才叫线程池而不是任务池,线程池会分配池中的一个空闲线程对象来运行提交的任务。
构成线程池的几个要素:
等待队列:
即将要执行的任务队列,因为某些原因,线程池并没有马上运行这些任务
核心线程:
执行任务的线程对象,其数量由 corePoolSize 指定
非核心线程:
一旦任务数量过多,线程池将创建非核心线程临时帮助运行任务
注意:实际上,并没有「核心线程」与「非核心线程」这样的概念,只是为了方便大家理解,而所有区分而已。因此,全文对这两个概念均加了引号。
线程池的大致工作流程:
1、开发者提交待执行任务,线程池收到这个任务请求后,有以下几种处理情况:
当前线程池中运行的线程数量还没有达到 corePoolSize 大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态。
当前线程池中运行的线程数量已经达到 corePoolSize 大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行。
根据队列规则,这个任务无法加入到等待队列,这时线程池就会创建一个“非核心线程”直接运行这个任务。
注意,如果这种情况下任务执行成功,那么当前线程池中的线程数量一定大于 corePoolSize。
如果这个任务,无法被”核心线程”直接执行,又无法加入等待队列,又无法创建“非核心线程”直接执行,线程池将根据拒绝处理器定义的策略处理这个任务。
比如在 ThreadPoolExecutor 中,如果你没有为线程池设置 RejectedExecutionHandler。
这时线程池会抛出 RejectedExecutionException 异常,即线程池拒绝接受这个任务。
实际上抛出 RejectedExecutionException异常的操作,是 ThreadPoolExecutor 线程池中一个默认的RejectedExecutionHandler 实现。
2、一旦线程池中某个线程完成了任务的执行,它就会试图到任务等待队列中拿去下一个等待任务 ( 所有的等待任务都实现了 BlockingQueue 接口,这是一个可阻塞的队列接口 ) ,它会调用等待队列的 poll 方法,并停留在哪里。
3、当线程池中的线程超过您设置的 corePoolSize 参数,说明当前线程池中有所谓的“非核心线程”。那么当某个线程处理完任务后,如果等待 keepAliveTime 时间后仍然没有新的任务分配给它,那么这个线程将会被回收。线程池回收线程时,并不是所谓的“非核心线程”才会被回收,而是谁的空闲时间达到 keepAliveTime 这个阀值,就会被回收,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。
对所谓的“核心线程”和“非核心线程”是一视同仁的,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。
使用 ThreadPoolExecutor 类的构造方法可以创建不同配置的线程池,但平时却很少使用。大多数应用场景下,使用Java并发包中的 Executors 提供的5个静态工厂方法就足够了。
首先,来看看 Executor 框架的基本组成:
Executor 是一个基础接口,只有一个 execute(Runnable) 方法,用于任务执行,它屏蔽了许多任务提交、线程创建和调度等不相关细节。
ExecutorService 接口则更加完善,提供了一些管理功能,比如 shutdown 方法用于关闭线程池;还提供了更加全面的任务提交机制,比如用 submit 方法提交任务,返回的是一个 Future 对象,用于获取任务执行结果;甚至还有批量处理任务的功能,比如 invokeAll 或者 invokeAny 等方法。
ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool 则是 Java 提供的几种基础线程池实现,以满足复杂多变的应用场景。
Executors 是一个工具类,从简化使用的角度,提供了各种静态工厂方法来创建不同配置的线程池。
前面的内容已经给出 ThreadPoolExecutor 的构造方法,这里对构造方法的后3个参数作详细的说明,帮你更好的理解和使用线程池。
只要实现了 BlockingQueue 接口的队列,都可以作为线程池的等待队列,常见的比如:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、LinkedTransferQueue等,至于每种队列的区别以及原理,不再本文讨论范文内。
但网上有些内容真的很容易误导人,这里提两句。
SynchronousQueue 也可以存储数据,且是无锁实现,只是size()方法直接返回0而已。因此,网上的很多内容在把 SynchronousQueue 和 LinkedBlockingQueue 与 LinkedTransferQueue 作对比时,部分内容的正确性是有待确认的,这点需要读者自己注意。
PriorityBlockingQueue 会按照优先级进行对内部元素进行排序,优先级最高的元素将始终排在队列的头部。但它不会保证优先级一样的元素的排序,也不保证当前队列中除了优先级最高的元素以外的元素,处于正确排序的位置。因此,它并不是真正意义的排序。
线程池最主要的一项工作,就是在满足某些条件的情况下创建线程。而在 ThreadPoolExecutor 线程池中,创建线程的工作交给 ThreadFactory 来完成。要使用线程池,就必须要指定 ThreadFactory。如果没有指定,会使用默认的 ThreadFactory:DefaultThreadFactory (这个类在Executors工具类中)。
当然,在某些特殊业务场景下,您还可以使用一个自定义的 ThreadFactory 线程工厂,如下代码片段:
public class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
在 ThreadPoolExecutor 线程池中还有一个重要的接口:RejectedExecutionHandler。当提交任务给线程池时,出现以下情况时,线程池会拒绝处理这个任务,并触发线程池创建时定义的拒绝策略:
新任务无法直接被线程池中“核心线程”直接处理,又无法加入等待队列,也无法创建新的线程执行
线程池已经调用 shutdown 方法停止了工作
线程池不是处于正常的工作状态
实际上,在 ThreadPoolExecutor 中已经提供了四种可以直接使用的 RejectedExecutionHandler 接口的实现:
CallerRunsPolicy:
在非线程池以外直接调用这个任务的run方法
DiscardPolicy:
直接丢弃这个被拒绝的任务,且没有任何提示
DiscardOldestPolicy:
丢弃等待队列队首的任务,将当前被拒绝的任务提交到线程池执行
AbortPolicy:
拒绝任务并抛出 RejectedExecutionException 异常
其中,CallerRunsPolicy 直接调用任务的 run 方法,可能会造成线程安全问题;DiscardPolicy 默默的忽略掉被拒绝任务,也没有输出日志或者任何提示,开发者就无法得知线程池在处理过程出现的错误;DiscardOldestPolicy 貌似是科学的,但如果等待队列出现了容量问题,很多任务会被直接丢弃,这时,业务会出现BUG,但开发者却很难定位到。
因此,比较科学的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。当然,特殊情况下,我也建议使用自定义拒绝策略,可以缓存任务到Redis,或者发送消息到MQ通知业务方。
在 ThreadPoolExecutor 中提供了3个方法供子类重写,它们可以帮助开发者在线程池处理任务的不同阶段,进行额外的业务处理操作:
beforeExecute:
当线程池正要开始执行某个任务时,线程池会触发这个方法的调用。
afterExecute:
当线程池完成了某个任务的执行后,线程池就会触发这个方法。
terminated:
当线程池本身停止执行的时候,该方法就会被调用。
ThreadPoolExecutor 提供 execute 和 submit 两个方法用于提交任务,其中:
execute:提交的任务实现 Runnable 接口,任务没有任何返回值,因此,无法获取任何执行结果。
submit:提交的任务实现 Callable 接口,在任务运行完成后,会返回执行结果。
当然 submit 方法也可以提交实现 Runnable 接口的任务,但其处理方式与 execute 方法的处理方式完全不同:使用 submit 方法提交的实现了 Runnable 接口的任务,将会被封装到线程池内部使用 Executors.callable 方法创建的 RunnableAdapter 对象中,而 RunnableAdapter 又继承自 Callable。
如果使用 Executors 创建线程池,那么一定要理解每个方法创建出来的线程池的配置是什么。
比如,newCachedThreadPool 方法创建的线程池,其 corePoolSize = 0,而 maximumPoolSize = Integer.MAX,而其等待队列是 SynchronousQueue,不能缓冲数据。它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。但需要着重注意的是,过快的任务提交速度,不但会导致线程数的急剧增加,也增加了程序OOM的风险。
而newFixedThreadPool ( int nThreads) 方法中,corePoolSize=maximumPoolSize=nThreads,任何时候最多有nThreads 个工作线程是活动的,这也意味着如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。
实际应用场景中,可能有很多人滥用了这些方法,所以,阿里Java规约建议使用 ThreadPoolExecutor 构造方法来代替 Executors。
站在应用或者服务的角度,对整个服务中线程的用途归类,每个分类创建合适的线程池即可。
看到过挺多代码,只要用到线程,就是用 Executors 在类中创建了一个线程池,这样挺不好的。
记住一点:服务中创建的线程池越多,就越不好监控,也越不好配置合适的线程池数量。
也许你已经看到过无数计算最佳线程数的公式,首先,随便选一个,真的无所谓。
然后,随时监控线程池状态。最简单的方式,每次提交任务时,在日志中记录下线程池的几个关键参数:corePoolSize、maximumPoolSize、线程池当前线程数量、工作队列长度。
最后,相信我,有上面4个参数,再结合硬件资源,你能够很清楚的知道,自己的线程池配置是否合适。
开发者要随时注意,任务的耗时,以免线程池被耗尽,等待队列被填满。
如果任务依赖于第三方服务,一定要设置超时时间。
在一些特殊的情况下,线程池的负载短时间内快速升高,有可能会触发拒绝策略,如果提交的任务不允许丢失,那么需要自定义拒绝策略,将任务暂存到数据库或者缓存中。
也就是在使用线程池时,要尽量兼顾到线程池的所有使用场景。
调用线程池的 shutdown 方法后,线程池不再接受新任务,如果继续提交,线程池会使用拒绝策略响应。
调用线程池的 shutdownNow 方法,会中断所有线程,然后取出工作队列中所有未完成的任务返回给调用者。
调用这两个方法都不会主动等待任务执行结束,在某些场景下,在关闭线程池时,需要保证所有的任务均执行完毕,可以调用 awaitTermination 方法判断。
如果线程池任务执行结束,awaitTermination 方法将会返回 true,否则在等待指定时间后将会返回 false。
// 阻止接收新任务
threadPool.shutdown();
// 等待线程池中的任务执行完成
while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
// do nothing
}
// 示例代码,不建议生产环境直接使用
// 如果有任务卡住,会导致线程池不能关闭