Java 线程池源码 线程池的执行流程
主线程调用execute
、或者submit
等方法提交任务给线程池执行
如果线程池中正在运行的工作线程数量小于corePoolSize
,线程池会创建线程去运行这个任务
如果线程池中正在运行的工作线程数量大于或等于 corePoolSize
(核心线程数量),那么将这个任务放入队列,等待线程从队列中获取任务
如果这时队列满了且正在运行的工作线程数量还小于 maximumPoolSize
,那么会创建非核心线程立刻运行这个任务,这部分非核心工作线程空闲超过一定的时间(keepAliveTime
)时,就会被销毁回收(线程退出了)
如果最终提交的任务超过了maximumPoolSize
(最大线程数量),线程池就会执行拒绝策略
线程池创建参数 线程池的构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
corePoolSize
: 表示线程池核心线程的数量, 线程池中的核心线程池的数量从创建以后是不会改变的(动态线程池除外)
maximumPoolSize
: 线程池中创建的最大线程数量, 当阻塞队列无法继续存放任务的时候就要通过创建非核心 线程进行任务的运行, 非核心线程在执行完任务之后会自动结束
keepAliveTime
: 非核心线程的存活时间, 如果超过了这个时间线程就会自动结束
unit
: 存活时间的单位, 一般是秒
workQueue
: 阻塞队列, 当核心线程数量小于任务数量的时候, 任务会放入到队列中等待线程空闲后去获取, 如果队列为空, 线程就会被阻塞. 直到有任务被放入队列中
threadFactory
: 创建线程的工厂, 如果我们想自定线程(例如指定线程的prefix
)可以传入这个参数
handler
: 拒绝策略: 拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize
)时如何拒绝
线程池状态
RUNNING:线程池正在运行中
SHUTDOWN:线程池已关闭,但仍有任务在等待执行
STOP:线程池已关闭,且没有任务在等待执行
TIDYING: 任务完成, 工作线程为0, 在调用terminated()之前的状态
TERMINATED:线程池已终止
线程池类中的成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;
Worker 如果提交的任务是创建线程运行, 线程就会被封装成一个Worker, 就是我们常说的Worker线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } }
提交任务到线程池运行的流程 创建线程池 我们先用自带的线程池工具类创建一个线程
1 2 3 4 5 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();executorService.submit(() -> { System.out.println("hello this is the first job into thread pool" ); });
下面就是正式的源码分析
submit 这个方法就是对task
的一个封装, 我们要异步获取结果, 就可以使用Future
(Future
在这里不坐详尽的描述)
1 2 3 4 5 6 7 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
newTaskFor 1 2 3 protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value); }
execute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
addWoker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorkerFailed 如果创建线程成功, 但是启动失败了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
runWorker 前面我们知道, 线程的run
方法实际是一个runWorker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask 线程从队列中获取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
processWorkerExit 从名字中可以看出, 是处理线程退出事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
tryTerminate 尝试将线程池的状态设置为terminate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
声明 如果有错误请联系作者更正