Java 线程池源码

线程池的执行流程

  1. 主线程调用execute、或者submit等方法提交任务给线程池执行
  2. 如果线程池中正在运行的工作线程数量小于corePoolSize,线程池会创建线程去运行这个任务
  3. 如果线程池中正在运行的工作线程数量大于或等于 corePoolSize(核心线程数量),那么将这个任务放入队列,等待线程从队列中获取任务
  4. 如果这时队列满了且正在运行的工作线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,这部分非核心工作线程空闲超过一定的时间(keepAliveTime)时,就会被销毁回收(线程退出了)
  5. 如果最终提交的任务超过了maximumPoolSize(最大线程数量),线程池就会执行拒绝策略

线程池创建参数

线程池的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  1. corePoolSize: 表示线程池核心线程的数量, 线程池中的核心线程池的数量从创建以后是不会改变的(动态线程池除外)
  2. maximumPoolSize: 线程池中创建的最大线程数量, 当阻塞队列无法继续存放任务的时候就要通过创建非核心线程进行任务的运行, 非核心线程在执行完任务之后会自动结束
  3. keepAliveTime: 非核心线程的存活时间, 如果超过了这个时间线程就会自动结束
  4. unit: 存活时间的单位, 一般是秒
  5. workQueue: 阻塞队列, 当核心线程数量小于任务数量的时候, 任务会放入到队列中等待线程空闲后去获取, 如果队列为空, 线程就会被阻塞. 直到有任务被放入队列中
  6. threadFactory: 创建线程的工厂, 如果我们想自定线程(例如指定线程的prefix)可以传入这个参数
  7. handler: 拒绝策略: 拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝

线程池状态

  1. RUNNING:线程池正在运行中

  2. SHUTDOWN:线程池已关闭,但仍有任务在等待执行

  3. STOP:线程池已关闭,且没有任务在等待执行

  4. TIDYING: 任务完成, 工作线程为0, 在调用terminated()之前的状态

  5. TERMINATED:线程池已终止

线程池类中的成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 线程池的状态被表示为一个整数,该整数包含两部分信息:高 3 位表示线程池的运行状态,低 29 位表示当前线程池中的工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// worker线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 非核心线程的存活时长
private volatile long keepAliveTime;
// 是否允许超时(获取任务的时候被阻塞, 可以带上超时时间)
private volatile boolean allowCoreThreadTimeOut;
// 核心线程池数量
private volatile int corePoolSize;
// 最大线程数量
private volatile int maximumPoolSize;

Worker

如果提交的任务是创建线程运行, 线程就会被封装成一个Worker, 就是我们常说的Worker线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {

private static final long serialVersionUID = 6138294804551838833L;

final Thread thread;
// 这和就是我们提交的任务(可能是被封装成Future)
Runnable firstTask;
// 线程执行任务的计数
volatile long completedTasks;

Worker(Runnable firstTask) {
// 启动前禁止中断
setState(-1);
this.firstTask = firstTask;
// 注意, 这个线程传入的Runnable就是this, 调用的是worker里面的run方法
this.thread = getThreadFactory().newThread(this);
}

public void run() {
// 这个后续源码中会分析到
runWorker(this);
}

}

提交任务到线程池运行的流程

创建线程池

我们先用自带的线程池工具类创建一个线程

1
2
3
4
5
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
// 提交一个任务试试
executorService.submit(() -> {
System.out.println("hello this is the first job into thread pool");
});

下面就是正式的源码分析

submit

这个方法就是对task的一个封装, 我们要异步获取结果, 就可以使用Future(Future在这里不坐详尽的描述)

1
2
3
4
5
6
7
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 对应execute方法
execute(ftask);
return ftask;
}

newTaskFor

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 如果线程数量少于核心线程数量(第一次肯定是小于的, 因为才开始创建核心线程)
if (workerCountOf(c) < corePoolSize) {
// 尝试去创建worker线程, 创建成功就直接return
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池是运行状态, 并且任务成功添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查, 可能在这之间线程池状态改变了
// 如果发现线程池不是运行状态, 就要移除任务, 并且拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果发现核心线程数量为0, 就要去尝试创建worker,因为队列中还有任务(刚刚才往队列中放入了一个任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 核心线程数已满, 阻塞队列已满
// 如果直接创建worker失败了就要去拒绝任务
else if (!addWorker(command, false))
reject(command);
}

addWoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 某些情况下要检查队列是否为空
// 线程池状态为STOP, TIDYING, TERMINATED
// 线程池状态为SHUTDOWN, 且要执行的任务不为空
// 线程池状态为SHUTDOWN, 且任务队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// worker线程的数量超过了最大容量
// 或者
// 1. 作为核心线程: 如果超过了核心线程数量
// 2. 作为非核心线程: 如果超过了最大线程数量
// 都会创建worker线程失败, 返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 先计数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果发现rs不一样了, 说明有其他线程进行线程池的修改, 需要重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker线程, firstTask就是我们的任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 创建线程过程中需要加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果线程池不是shutdown, 或者线程shutdown了,
// 并且firstTask为空, 我们要创建新的线程执行这个任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程是alive的, 这个时候线程还没有启动, 如果是alive说明线程的状态是异常的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到set里面
workers.add(w);
int s = workers.size();
// 更新一下最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 标记线程已经被添加了
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果线程成功被添加, 就启动线程
// 启动线程之后就要执行run了
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有被启动
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorkerFailed

如果创建线程成功, 但是启动失败了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 线程启动失败时,需将前面添加的线程删除
workers.remove(w);
// ctl变量中的工作线程数-1
decrementWorkerCount();
// 尝试将线程池转变成TERMINATE状态
tryTerminate();
} finally {
mainLock.unlock();
}
}

runWorker

前面我们知道, 线程的run方法实际是一个runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拿取到这个task, 并且要设置为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 获取线程本身的task或者去队列(getTask)里面拿取任务
// 如果任务是null的话就会结束循环
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 模板方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的任务
task.run();

// 如果抛出异常就会执行afterExecute方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 模板方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程退出前会执行这个方法
// 这个方法很关键
processWorkerExit(w, completedAbruptly);
}
}

getTask

线程从队列中获取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
// 是否允许超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超过核心线程或者超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 减一
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 超时或者不超时获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit

从名字中可以看出, 是处理线程退出事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计一下运行过的任务
completedTaskCount += w.completedTasks;
// 从worker中移除该线程
workers.remove(w);
} finally {
mainLock.unlock();
}

// 每当线程退出的时候都会执行该方法
// 会去尝试关闭线程池
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 会重新创建线程
addWorker(null, false);
}
}

tryTerminate

尝试将线程池的状态设置为terminate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果线程池还在运行
// 或者至少是TIDYING
// 或者已经是shutdown了, 并且队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程池中的数量不为0, 不能结束
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中断一个线程
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 只尝试关闭一次, 失败就不管了
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有等待任务的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

声明

如果有错误请联系作者更正