追踪解析 ThreadPoolExecutor 源码

栏目: Java · 发布时间: 5年前

内容简介:文章异常啰嗦且绕弯。JDK 版本 : OpenJDK 11.0.1IDE : idea 2018.3

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadPoolExecutor 简介

ThreadPoolExecutor 是 jdk4 中加入的工具,被封装在 jdk 自带的 Executors 框架中,是 java 中最经典的线程池技术。

ThreadPoolExecutor 类在 concurrent 包下,和其它线程 工具 类一样都由 Doug Lea 大神操刀完成。

[ 在看完 Spring ioc 和 Gson 之后有点乏了,换换口味看一些 jdk 的源码 ]

3 Demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

    public static void main(String[] args){
        //创建线程池
        //这里使用固定线程数的线程池,线程数为 5
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 0 ; i < 100 ; i ++){
            final int ii = i;
            //创建 Runnable 作为线程池的任务
            Runnable r = () -> System.out.println(ii);
            //执行
            executorService.execute(r);
        }
    }
}

一 线程池的初始化

线程池的初始化调用的 Executors 框架的静态方法:

//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

继续追踪这个构造方法:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

继续追踪:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {

    //验证参数的有效性                        
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    //本例中不涉及权限
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();

    //线程数
    this.corePoolSize = corePoolSize;
    //最大线程数
    //本例中使用固定线程数的线程池,所以线程数和最大线程数相等
    this.maximumPoolSize = maximumPoolSize;
    //用于存储任务的队列
    //此处使用 LinkedBlockingQueue 来储存任务,其线程安全
    this.workQueue = workQueue;
    //keepAliveTime 参数用于表示:
    //对于超出线程和队列缓存总和的任务,是否要临时增加线程来处理
    //超出的线程的存在时间是多少
    //这里使用的是定长线程池,所以 keepAliveTime = 0,即不增加线程
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    //用于创建线程的工厂类
    this.threadFactory = threadFactory;
    //handler 用来处理 task 太多时候的拒绝策略
    //此例中使用的是默认的,即定义在 ThreadPoolExecutor 中的 defaultHandler 对象
    this.handler = handler;
}

二 Worker

Worker 是 ThreadPoolExecutor 的内部类,可以看做是 Runnable 的代理类:

//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    //完成 task 数量的计数器
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //这个方法是 AbstractQueuedSynchronizer 中的方法,功能相当于加锁
        //-1 的意思是后续的任务会处于阻塞状态,即为已经加锁
        setState(-1);
        //在创建的时候存入一个要处理的 task
        //需要注意的是每个 worker 对象被创建出来之后是可以重复利用来处理多个 task 的
        this.firstTask = firstTask;
        //worker 会用自身作为 Runnable 对象去创建一个线程
        //这里调用线程工厂进行线程创建
        this.thread = getThreadFactory().newThread(this);
    }

    //对于线程变量来说,其启动的就是 worker 的 run() 方法
    public void run() {
        //runWorker(...) 方法在 ThreadPoolExecutor 里
        runWorker(this);
    }

    //获取锁的状态
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //重写了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法
    //尝试加锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //重写了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法
    //尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //真正的加锁方法
    public void lock() { 
        acquire(1); 
    }
    //尝试加锁
    public boolean tryLock() { 
        return tryAcquire(1); 
    }
    //真正的释放锁方法
    public void unlock() { 
        release(1); 
    }
    //判断是否在锁中
    public boolean isLocked() { 
        return isHeldExclusively(); 
    }
    //中断线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

追踪一下 runWorker(...) 方法:

//ThreadPoolExecutor.class
final void runWorker(Worker w) {
    //获取当前所在的线程的实例对象
    Thread wt = Thread.currentThread();
    //获取 task
    Runnable task = w.firstTask;
    //取出来之后把 task 置空
    w.firstTask = null;
    //此处释放锁
    w.unlock();
    //指示器,此变量为 true 的时候确认该方法已经执行完毕
    boolean completedAbruptly = true;
    try {
        //此处为一个 while 循环,用于不断的执行 task
        //getTask() 方法会从队列里不断抓取 task 并进行执行
        //当 task 为 null,且队列里已经没有更多 task 的时候,就会终止循环
        while (task != null || (task = getTask()) != null) {
            //加锁,独占线程
            w.lock();
            //在这里会判断线程的状态,如果存在符合中断的情况,就会直接中断掉
            if ((runStateAtLeast(ctl.get(), STOP) 
                    || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();

            try {
                //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并没有实现
                //是预留出来给使用者重写,以达到业务需求的方法
                beforeExecute(wt, task);
                try {
                    //此处执行 task
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                //将执行的 task 置空
                task = null;
                //每完成一个 task 就会加 1
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //这个方法会销毁掉 worker
        //同时如果检测到有新的 task 又会重新创建 Worker
        processWorkerExit(w, completedAbruptly);
    }
}

Worker 是线程池中真正起完成业务逻辑的组件,是任务和线程的封装。

三 线程池的状态控制

线程池的状态主要由 ctl 变量来进行控制:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 是一个 AtomicInteger 类型的变量,其实可以简单理解为一个 int 值,AtomicInteger 只是能够适应高并发的原子化操作的需要。

ctl 的前 29 位数用来表示线程(Worker)的数量,后面三位用来表示线程池的状态。

线程池的状态有五种,分别是 Running、Shutdown、Stop、Tidying、Terminate,根据单词就能猜出大概。

注意的是,这五种状态在线程池中都以 int 变量的形式存在,从前到后依次变大,对状态的比较有一系列方法:

//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
    //c 的状态值要小于 s
    return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
    //c 的状态值要大于或等于 s
    return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
    //状态里只有 RUNNING 是小于 SHUTDOWN 的
    return c < SHUTDOWN;
}

在这些方法里,传入的参数 c 一般指的是当前线程池状态,s 是用来对比的参照状态。

四 线程池的执行

该 part 的起点:

executorService.execute(r);

来追踪 execute(...) 方法:

public void execute(Runnable command) {
    //有效性验证
    if (command == null)
        throw new NullPointerException();
    
    //ctl 是一个 AtomicInteger 类型的变量,用来记录线程池的状态
    int c = ctl.get();
    
    //workerCountOf(...) 方法会返回当前运行的 Worker 的数量
    if (workerCountOf(c) < corePoolSize) {
        //Worker 的数量小于线程池容量的情况下
        //直接增加 Worker 并取出 task 去运行
        if (addWorker(command, true))
            return;
        //如果 Worker 已经顺利执行了 task,应该会直接返回掉
        //如果执行中出现了其它情况,则会继续往下走
        //此处刷新状态
        c = ctl.get();
    }
    //当 Worker 数量已经达到线程池的指定数量,或者添加 Worker 的时候出问题的时候,会进入此判断语句
    //先判断线程池是否处于活跃状态,且 task 是否已经被成功添加到队列中
    //如果不满足,会进入 else 语句中,先最后尝试一次 addWorker(...) 方法,如果不成功就拒绝 task
    //reject(...) 方法会调用 handler 的拒绝策略
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else if (!addWorker(command, false))
        reject(command);
}

1 reject

这里先提及一下 reject(...) 方法:

//ThreadPoolExecutor.class
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

本质是调用了 handler 对象的相关方法。在本例中,handler 对象指向了 defaultHandler:

//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

defaultHandler 是一个 AbortPolicy 类型的对象,而 AbortPolicy 是 ThreadPoolExecutor 的静态内部类。

AbortPolicy 起作用的方法为 rejectedExecution(...) 方法:

//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                        " rejected from " + e.toString());
}

也就是说,在 task 过多的情况下,AbortPolicy 的应对策略是抛出异常。

2 addWorker

来看一下核心方法 addWorker(...):

//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
    //先标记这个 for 循环,方便退出循环
    retry:
    //在每一次循环开始之前会刷新一次状态标识
    for (int c = ctl.get();;) {
        //这里先进行判断,如果线程池已经关闭了,或者没有 task 了,就会返回 false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            //如果 Worker 数量已经超出了最大值就会直接返回 false
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            //将 ctl 变量的值加 1,如果成功了就会跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //在状态值比 SHUTDOWN 大的时候会直接跳到最外头的循环里
            //需要注意的是最外面的 for 循环会判断状态值是否大于 SHUTDOWN
            //如果大于 SHUTDOWN 的话就返回 false 了
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个 Worker
        w = new Worker(firstTask);
        //获取线程对象
        final Thread t = w.thread;
        if (t != null) {
            //加锁,此处加的是一把全局的锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                //如果状态值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就会进入这个判断语句
                //
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    //如果这个线程已经处于运作状态,会抛出异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //workers 是一个列表,用于存储 Worker 对象
                    workers.add(w);
                    //获取 Worker 的数量
                    int s = workers.size();
                    //largestPoolSize 用来记录线程池达到过的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //标记 Worker 已经被添加
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //先判断 Worker 是否已经被添加到 workers 内了
            if (workerAdded) {
                //这是该方法核心的启动线程方法
                t.start();
                //标记 Worker 已经开始运行了
                workerStarted = true;
            }
        }
    } finally {
        //如果没有标记 Worker 已经开始工作,会在这里销毁掉 Worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

五 一点唠叨

先总结一下线程池的业务逻辑:

1 接收到 task (即实现了 Runnable 接口的实例对象) [execute(...) 方法]

2 用 task 去尝试创建一个 Worker 实例 [execute(...) 方法]
    2.1 如果 Worker 数量没有达到线程池的指定最大值 -> 新建
    2.2 如果 Worker 数量达到了线程池的指定最大值 -> 不会再创建,而是把 task 储存起来等待空闲的 Worker 去提取
    2.3 如果 task 队列也已经满了,无法再添加 -> 触发拒绝机制(handler)

3 Worker 在执行的时候调用其内部的 Thread 实例对象的 start() 方法 [addWorker(...) 方法]

4 该 start() 方法会调用到 Worker 的 run() 方法 [Worker.class 内的 run() 方法]

5 Worker 的 run() 方法本质上是封装了 task 的 run() 方法 [runWorker(...) 方法]

主线业务逻辑不算复杂,比较艰难的是为了保证数据的一致性,线程池代码中充斥着大量的状态判断和锁机制。

并且为了考虑性能问题,线程池的设计没有使用悲观锁(synchronized 关键字),而是大量使用了 ASQ 和 ReetrentLock 机制。

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Inside Larry's and Sergey's Brain

Inside Larry's and Sergey's Brain

Richard Brandt / Portfolio / 17 Sep 2009 / USD 24.95

You’ve used their products. You’ve heard about their skyrocketing wealth and “don’t be evil” business motto. But how much do you really know about Google’s founders, Larry Page and Sergey Brin? Inside......一起来看看 《Inside Larry's and Sergey's Brain》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具