ThreadPoolExecutor 线程池源码分析-基于jdk8

栏目: Java · 发布时间: 5年前

内容简介:测试demo, ThreadPoolExecutorTest:ThreadPoolExecutor 抽出来的一些核心方法:

测试demo, ThreadPoolExecutorTest:

public class ThreadPoolExecutorTest {
	public static void main(String[] args) throws InterruptedException {
		final boolean isFair = false;
		ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair);
//		arrayBlockingQueue.add(new MyThreadTask(10086));
		
		final int corePoolSize = 3;
		final int maximumPoolSize = 6;
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
				corePoolSize, maximumPoolSize,
		        1, TimeUnit.SECONDS,
		        arrayBlockingQueue,
		        new ThreadPoolExecutor.CallerRunsPolicy());
		
//		threadPool.allowCoreThreadTimeOut(true);
//		Integer result = 9;
		for (int index = 1; index <= 10; index++) {
			Thread tempNewThread = new MyThreadTask(index);
		    threadPool.execute(tempNewThread);
//			result = threadPool.submit(new MyThreadTask(i), result);
		}
	    
//	    threadPool.shutdown();
	}
}

ThreadPoolExecutor 抽出来的一些核心方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	
	/***
	 * 线程中真正执行的Worker线程
	 */
	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	/** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
		Worker(Runnable firstTask) {
			setState(-1); // inhibit interrupts until runWorker
			this.firstTask = firstTask;
			this.thread = getThreadFactory().newThread(this);
		}

		/***
		 * 代理, 执行上层的runWorker方法;
		 */
		public void run() {
			runWorker(this);
		}
	}

	/***
	 * 把firstTask 添加到核心线程, 并启动;
	 * @param firstTask
	 * @param core 是否是核心线程
	 * @return
	 */
	private boolean addWorker(Runnable firstTask, boolean core) {
		boolean workerStarted = false;
		boolean workerAdded = false;
		Worker w = null;
		try {
			w = new Worker(firstTask);
			final Thread t = w.thread;
			if (t != null) {
				int rs = runStateOf(ctl.get());
				if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
				if (workerAdded) {
					t.start();
					workerStarted = true;
				}
			}
		} finally {
			if (!workerStarted)
				addWorkerFailed(w);
		}
		return workerStarted;
	}

	/***
	 * 从 workQueue 的线程等待队列中获取线程(后面准备执行);
	 * @return
	 */
	private Runnable getTask() {
		boolean timedOut = false; // Did the last poll() time out?
		for (;;) {
			try {
				Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
				if (r != null)
					return r;
				timedOut = true;
			} catch (InterruptedException retry) {
				timedOut = false;
			}
		}
	}

	/***
	 * 运行Worker线程; 
	 * 
	 * while (task != null || (task = getTask()) != null) 
	 * 第一次判断是当前的核心线程;
	 * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行;
	 * 
	 * task.run();
	 * 直接调用的run方法(外层已经有worker的线程包装起的)
	 */
	final void runWorker(Worker w) {
		Thread wt = Thread.currentThread();
		Runnable task = w.firstTask;
		w.firstTask = null;
		w.unlock(); // allow interrupts
		boolean completedAbruptly = true;
		try {
			while (task != null || (task = getTask()) != null) {
				w.lock();
				// If pool is stopping, ensure thread is interrupted;
				// if not, ensure thread is not interrupted. This
				// requires a recheck in second case to deal with
				// shutdownNow race while clearing interrupt
				if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
						&& !wt.isInterrupted())
					wt.interrupt();
				try {
					beforeExecute(wt, task);
					Throwable thrown = null;
					try {
						task.run();
					} catch (RuntimeException x) {
						thrown = x;
						throw x;
					} catch (Error x) {
						thrown = x;
						throw x;
					} catch (Throwable x) {
						thrown = x;
						throw new Error(x);
					} finally {
						afterExecute(task, thrown);
					}
				} finally {
					task = null;
					w.completedTasks++;
					w.unlock();
				}
			}
			completedAbruptly = false;
		} finally {
			processWorkerExit(w, completedAbruptly);
		}
	}

	/***
	 * 执行, 线程池执行线程的总入口
	 * @param command
	 */
	public void execute(Runnable command) {
		int c = ctl.get();
		// 核心线程执行
		if (workerCountOf(c) < corePoolSize) {
			if (addWorker(command, true))
				return;
			c = ctl.get();
		}
		// 核心已经多了, 添加到 workQueue
		if (isRunning(c) && workQueue.offer(command)) {
			int recheck = ctl.get();
			if (!isRunning(recheck) && remove(command))
				reject(command);
			else if (workerCountOf(recheck) == 0)
				addWorker(null, false);
		} else if (!addWorker(command, false))
			reject(command);
	}

	/**
	 * 其中一个拒绝策略
	 */
	public static class CallerRunsPolicy implements RejectedExecutionHandler {
		public CallerRunsPolicy() {
		}
		public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
			if (!e.isShutdown()) {
				r.run();
			}
		}
	}
}

以上所述就是小编给大家介绍的《ThreadPoolExecutor 线程池源码分析-基于jdk8》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

精通正则表达式

精通正则表达式

Jeffrey E. F. Friedl / 余晟 / 电子工业出版社 / 2007 / 75

随着互联网的迅速发展,几乎所有工具软件和程序语言都支持的正则表达式也变得越来越强大和易于使用。本书是讲解正则表达式的经典之作。本书主要讲解了正则表达式的特性和流派、匹配原理、优化原则、实用诀窍以及调校措施,并详细介绍了正则表达式在Perl、Java、.NET、PHP中的用法。 本书自第1 版开始着力于教会读者“以正则表达式来思考”,来让读者真正“精通”正则表达式。该版对PHP的相关内容、Ja......一起来看看 《精通正则表达式》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试