Java 并发编程 | 线程池详解

栏目: Java · 发布时间: 4年前

内容简介:线程池用来处理异步任务或者并发执行的任务优点:

线程池用来处理异步任务或者并发执行的任务

优点:

  1. 重复利用已创建的线程,减少创建和销毁线程造成的资源消耗
  2. 直接使用线程池中的线程,提高响应速度
  3. 提高线程的可管理性,由线程池同一管理

ThreadPoolExecutor

java 中线程池使用 ThreadPoolExecutor 实现

构造函数

ThreadPoolExecutor 提供了四个构造函数,其他三个构造函数最终调用的都是下面这个构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

入参:

  1. corePoolSize :线程池的核心线程数量

    线程池维护的核心线程数量,当线程池初始化后,核心线程数量为零,当有任务来到的时候才会创建线程去执行任务,当线程池中的工作线程数量等于核心线程数量时,新到的任务就会放到缓存队列中

  2. maximumPoolSize :线程池允许创建的最大线程数量

    当阻塞队列满了的时候,并且线程池中创建的线程数量小于 maximumPoolSize ,此时会创建新的线程执行任务

  3. keepAliveTime :线程活动保持时间

    只有当线程池数量大于核心线程数量时, keepAliveTime 才会有效,如果当前线程数量大于核心线程数量时,并且线程的空闲时间达到 keepAliveTime ,当前线程终止,直到线程池数量等于核心线程数

  4. unit :线程活动保持时间的单位

    keepAliveTime 的单位,包括: TimeUnit.DAYS 天, TimeUnit.HOURS 小时, TimeUnit.MINUTES 分钟, TimeUnit.SECONDS 秒, TimeUnit.MILLISECONDS 毫秒, TimeUnit.MICROSECONDS 微秒, TimeUnit.NANOSECONDS 纳秒

  5. workQueue :任务队列,用来保存等待执行任务的阻塞队列

    ArrayBlockingQueue :是一个基于数组结构的有界队列

    LinkedBlockingQueue :是一个基于链表结构的阻塞队列

    SynchronousQueue :不存储元素的阻塞队列,每一个插入操作必须等到下一个线程调用移除操作,否则插入操作一直阻塞

    PriorityBlockingQueue :一个具有优先级的无线阻塞队列

  6. threadFactory :用来创建线程的工厂

  7. handler :饱和策略,当线程池和队列都满了的时候,必须要采取一种策略处理新的任务,默认策略是 AbortPolicy ,根据自己需求选择合适的饱和策略

    AbortPolicy :直接抛出异常

    CallerRunsPolicy :用调用者所在的线程来运行当前任务

    DiscardOldestPolicy :丢弃队列里面最近的一个任务,并执行当前任务

    DiscardPolicy :不处理,丢弃掉

    当然我们也可以通过实现 RejectedExecutionHandler 去自定义实现处理策略

入参不同,线程池的运行机制也不同,了解每个入参的含义由于我们更透传的理解线程池的实现原理

提交任务

线程池处理提交任务流程如下

Java 并发编程 | 线程池详解

处理流程:

  1. 如果核心线程数量未满,创建线程执行任务,否则添加到阻塞队列中
  2. 如果阻塞队列中未满,将任务存到队列里
  3. 如果阻塞队列满了,看线程池数量是否达到了线程池最大数量,如果没达到,创建线程执行任务
  4. 如果已经达到线程池最大数量,根据饱和策略进行处理

ThreadPoolExecutor 使用 execute(Runnable command)submit(Runnable task) 向线程池中提交任务,在 submit(Runnable task) 方法中调用了 execute(Runnable command) ,所以我们只要了解 execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取线程池状态,并且可以通过ctl获取到当前线程池数量及线程池状态
    int c = ctl.get();
    // 如果工作线程数小于核心线程数量,则创建一个新线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果不符合上面条件,当前线程处于运行状态并且写入阻塞队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查,再次获取线程状态,如果当前线程状态变为非运行状态,则从队列中移除任务,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 检查工作线程数量是否为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //创建线程执行任务,如果添加失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

execute(Runnable command) 方法中我们比较关心的就是如何创建新的线程执行任务,就 addWorker(command, true) 方法

workQueue.offer(command) 方法是用来向阻塞队列中添加任务的

reject(command) 方法会根据创建线程池时传入的饱和策略对任务进行处理,例如默认的 AbortPolicy ,查看源码后知道就是直接抛了个 RejectedExecutionException 异常,其他的饱和策略的源码也是特别简单

关于线程池状态与工作线程的数量是如何表示的

ThreadPoolExecutor 中使用一个 AtomicInteger 类型变量表示

/**
 * ctl表示两个信息,一个是线程池的状态(高3位表示),一个是当前线程池的数量(低29位表示),这个跟我们前面  	* 说过的读写锁的state变量是一样的,以一个变量记录两个信息,都是以利用int的32个字节,高十六位表述读,低十 	* 六位表示写锁
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//低29位保存线程池数量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 运行状态存储在高3位
// 运行状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

addWorker(command, boolean) 创建工作线程,执行任务

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 线程池状态
        int rs = runStateOf(c);
        // 判断线程池状态,以及阻塞队列是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程工作线程数量
            int wc = workerCountOf(c);
            // 判断是否大于最大容量,以及根据传入的core判断是否大于核心线程数量还是最大线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加工作线程数量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //如果线程池状态改变,则重试
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建Worker,内部创建了一个新的线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				// 线程池状态判断
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将创建的线程添加到线程池
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //执行任务,首先会执行Worker对象的firstTask
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果任务执行失败
        if (! workerStarted)
            //移除worker
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

关闭线程池

ThreadPoolExecutor 中关闭线程池使用 shutdown()shutdownNow() 方法,原理都是通过遍历线程池中的线程,对线程进行中断

for (Worker w : workers) {
	Thread t = w.thread;
	if (!t.isInterrupted() && w.tryLock()) {
		try {
			t.interrupt();
		} catch (SecurityException ignore) {
		} finally {
			w.unlock();
		}
	}
	if (onlyOne)
		break;
	}
复制代码

Executor框架

Executor 框架将任务的提交与任务的执行进行分离

Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口

工厂方法:

newFixedThreadPool
newCachedThreadPool
newSingleThreadExecutor
newScheduledThreadPool

在阿里巴巴手册中强制要求禁止使用 Executors 提供的工厂方法创建线程池

Java 并发编程 | 线程池详解

这个确实是一个很严重的问题,我们部门曾经就出现过使用 FixedThreadPool 线程池,导致OOM,这是因为线程执行任务的时候被阻塞或耗时很长时间,导致阻塞队列一直在添加任务,直到内存被打满,报OOM

所以我们在使用线程池的时候要使用 ThreadPoolExecutor 的构造函数去创建线程池,根据自己的任务类型来确定核心线程数和最大线程数,选择适合阻塞队列和阻塞队列的长度

合理配置线程池

合理的配置线程池需要分析一下任务的性质(使用 ThreadPoolExecutor 创建线程池):

  1. CPU密集型任务应配置竟可能小的线程,比如 cpu数量+1

  2. IO密集型任务并不是一直在执行任务,应该配置尽可能多的线程,比如 cpu数量x2

    可通过 Runtime.getRuntime().availableProcessors() 获取cpu数量

  3. 执行的任务有调用外部接口比较费时的时候,这时cup空闲的时间就越长,可以将线程池数量设置大一些,这样cup空闲的时间就可以去执行别的任务

  4. 建议使用有界队列,可根据需要将长度设置大一些,防止OOM

参考:java并发编程的艺术


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Thinking Recursively

Thinking Recursively

Eric S. Roberts / Wiley / 1986-1-17 / USD 85.67

The process of solving large problems by breaking them down into smaller, more simple problems that have identical forms. Thinking Recursively: A small text to solve large problems. Concentrating on t......一起来看看 《Thinking Recursively》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具