Java线程池ThreadPoolExecutor源码详解

本文从源码层面去解读下java线程池的实现思想和代码。

总览

先看一张java线程池的继承关系图:

简单介绍下:

  • Executor 位于最顶层,也是最简单的,只有一个 execute(Runnable runnable) 接口方法定义

  • ExecutorService 也是接口,在 Executor 接口的基础上添加了很多的接口方法,很多时候我们使用这个接口就够了

  • AbstractExecutorService,这是抽象类,这里实现了非常有用的一些方法供子类直接使用,例如: invokeAll()、 invokeAny()

  • ThreadPoolExecutor 类,这个类才是真正的线程池实现,提供了非常丰富的功能。

从图中的方法可以看到,还涉及到一些其他类:

其中:

  • Executors类
    这个是工具类,里面的方法都是静态方法,如以下我们最常用的用于生成 ThreadPoolExecutor 的实例的一些方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }


    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • Future接口
    由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。

  • FutureTask类
    在线程池的使用过程中,我们是往线程池提交任务(task),我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。它首先是一个任务(Task),然后具有 Future 接口的语义,即可以在将来(Future)得到执行的结果。

  • BlockingQueue
    如果线程数达到 corePoolSize,我们的每个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 通常我们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每个实现类都有不同的特征.

Executor接口

1
2
3
4
5
6
7
/* 
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}

可以看到 Executor 接口非常简单,就一个 void execute(Runnable command) 方法,代表提交一个任务。为了理解 java 线程池的整个设计方案,我会按照 Doug Lea 的设计思路来多说一些相关的东西。

我们经常这样启动一个线程:

1
2
3
new Thread(new Runnable(){
// do something
}).start();

用了线程池 Executor 后就可以像下面这么使用:

1
2
3
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

如果我们希望线程池同步执行每一个任务,我们可以这么实现这个接口:

1
2
3
4
5
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run(); // 这里不是用的new Thread(r).start(),也就是说没有启动任何一个新的线程。
}
}

如果我们希望每个任务提交进来后,直接启动一个新的线程来执行这个任务,我们可以这么实现:

1
2
3
4
5
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); // 每个任务都用一个新的线程来执行
}
}

我们再来看下怎么组合两个 Executor 来使用,下面这个实现是将所有的任务都加到一个 queue 中,然后从 queue 中取任务,交给真正的执行器执行,这里采用 synchronized 进行并发控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SerialExecutor implements Executor {
// 任务队列
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
// 这个才是真正的执行器
final Executor executor;
// 当前正在执行的任务
Runnable active;

// 初始化的时候,指定执行器
SerialExecutor(Executor executor) {
this.executor = executor;
}

// 添加任务到线程池: 将任务添加到任务队列,scheduleNext 触发执行器去任务队列取任务
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});

if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
// 具体的执行转给真正的执行器 executor
executor.execute(active);
}
}
}

Executor 这个接口只有提交任务的功能,太简单了,我们想要更丰富的功能,比如我们想知道执行结果、我们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来我们要介绍的是继承自 Executor 接口的 ExecutorService 接口,这个接口提供了比较丰富的功能,也是我们最常使用到的接口。

ExecutorService

简单初略地来看一下这个接口中都有哪些方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
简单初略地来看一下这个接口中都有哪些方法:

public interface ExecutorService extends Executor {
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();

// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
// 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();

// 线程池是否已关闭
boolean isShutdown();

// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
// 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
boolean isTerminated();

// 等待所有任务完成,并设置超时时间
// 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);

// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);

// 提交一个 Runnable 任务
Future<?> submit(Runnable task);

// 执行所有任务,等全部完成后返回 Future 类型的一个 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 也是执行所有任务,但是这里设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 只要其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 同上一个方法,只要其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
// 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

这些方法都很好理解,一个简单的线程池主要就是这些功能,能提交任务,能获取结果,能关闭线程池,这也是为什么我们经常用这个接口的原因。

FutureTask

在继续往下层介绍 ExecutorService 的实现类之前,我们先来说说相关的类 FutureTask。

FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,所以每个 Runnable 通常都先包装成 FutureTask,然后调用 executor.execute(Runnable command) 将其提交给线程池.

Runnable 的 void run() 方法是没有返回值的,所以,通常,如果我们需要的话,会在 submit 中指定第二个参数作为返回值:

1
<T> Future<T> submit(Runnable task, T result);

其实到时候会通过这两个参数,将其包装成 Callable。

Callable 也是因为线程池的需要,所以才有了这个接口。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常

1
2
3
public interface Callable<V> {
V call() throws Exception;
}
1
2
3
public interface Runnable {
public abstract void run();
}

下面,我们来看看 ExecutorService 的抽象实现 AbstractExecutorService 。

AbstractExecutorService

AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。

  • invokeAny方法:
  • invokeAll方法:
  • newTaskFor方法: 用于将任务包装成 FutureTask

定义于最上层接口 Executor中的 void execute(Runnable command) 由于不需要获取结果,不会进行 FutureTask 的包装。

需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。

下面重点讲解下newTaskFor和invokeAny、invokeAll方法源码。

newTaskFor && submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public abstract class AbstractExecutorService implements ExecutorService {

/**
* RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
* 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

/**
* 提交任务
*/
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给子类执行器执行
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
}
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给子类执行器执行
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException();
}
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给子类执行器执行
execute(ftask);
return ftask;
}
}

invokeAny

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
* 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数,
* 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) {
throw new NullPointerException();
}
int ntasks = tasks.size();
if (ntasks == 0) {
throw new IllegalArgumentException();
}
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);

// ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器
// 它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个 completionQueue 队列中
// 这也是为什么这个类的名字里面有个 Completion 的原因。
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交
futures.add(ecs.submit(it.next()));
--ntasks; // 提交了一个任务,所以任务数量减 1
int active = 1; // 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)

for (; ; ) {
// ecs 上面说了,其内部有一个 completionQueue 用于保存执行完成的结果
// BlockingQueue的poll方法不阻塞,返回 null 代表队列为空
Future<T> f = ecs.poll(); // 非阻塞

// 为 null,说明刚刚提交的第一个线程还没有执行完成
// 在前面先提交一个任务,加上这里做一次检查,也是为了提高性能
if (f == null) {
if (ntasks > 0) { // 再提交一个任务
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
} else if (active == 0) { // 没有任务了,同时active为0,说明 任务都执行完成了
break;
} else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 带等待时间的poll方法
if (f == null) {
throw new TimeoutException(); // 如果已经超时,抛出 TimeoutException 异常,这整个方法就结束了
}
nanos = deadline - System.nanoTime();
} else {
f = ecs.take(); // 没有任务了,有一个在运行中,再获取一次结果,阻塞方法,直到任务结束
}
}

/*
* 我感觉上面这一段并不是很好理解,这里简单说下:
* 1. 首先,这在一个 for 循环中,我们设想每一个任务都没那么快结束,
* 那么,每一次都会进到第一个分支,进行提交任务,直到将所有的任务都提交了
* 2. 任务都提交完成后,如果设置了超时,那么 for 循环其实进入了“一直检测是否超时”
这件事情上
* 3. 如果没有设置超时机制,那么不必要检测超时,那就会阻塞在 ecs.take() 方法上,
等待获取第一个执行结果
* ?. 这里我还没理解 active == 0 这个分支的到底是干嘛的?
*/

if (f != null) { // 有任务结束了
--active;
try {
return f.get(); // 阻塞获取执行结果,如果有异常,都包装成 ExecutionException
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null) {
ee = new ExecutionException();
}
throw ee;

} finally {
// 方法退出之前,取消其他的任务
for (int i = 0, size = futures.size(); i < size; i++) {
futures.get(i).cancel(true);
}
}
}

/**
* 将tasks集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了,不设置超时时间
*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

/**
* 将tasks集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了,需要指定超时时间
*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

invokeAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
 /**
* 将tasks集合中的任务提交到线程池执行,全部线程执行完后才可以结束了
* 其实我们自己提交任务到线程池,也是想要线程池执行所有的任务
* 只不过,我们是每次 submit 一个任务,这里以一个集合作为参数提交
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null) {
throw new NullPointerException();
}
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
// 包装成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 提交任务
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 这是一个阻塞方法,直到获取到值,或抛出了异常
// 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
// 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 这个方法返回返回 List<Future>,而且是任务都结束了
return futures;
} finally {
if (!done) { // 异常情况下才会进入
// 方法退出之前,取消其他的任务
for (int i = 0, size = futures.size(); i < size; i++) {
futures.get(i).cancel(true);
}
}
}
}

/**
* 带超时的 invokeAll
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null) {
throw new NullPointerException();
}
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
futures.add(newTaskFor(t));
}

final long deadline = System.nanoTime() + nanos; // 直接计算出超时时刻
final int size = futures.size();

// 提交一个任务,检测一次是否超时
for (int i = 0; i < size; i++) {
execute((Runnable) futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
return futures;
}
}

for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L) {
return futures;
}
try {
// 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间,
// 因为上面其实已经用掉了一些时间了
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime(); // 更新剩余时间
}
}
done = true;
return futures;
} finally {
if (!done) {
for (int i = 0, size = futures.size(); i < size; i++) {
futures.get(i).cancel(true);
}
}
}
}

到这里,我们发现,这个抽象类包装了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。

构造函数

Executors 这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。其调用的就是构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 构造方法
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数,线程池允许创建的最大线程数
* @param keepAliveTime 空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。
* 注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,
* 那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)
* 使核心线程数内的线程也可以被回收
* @param unit 时间单位
* @param workQueue 任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)
* @param threadFactory 用于生成线程,一般我们可以用默认的就可以了。
* 通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样。
* @param handler 当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有
* 几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}

if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}

this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

关键变量

除了构造函数以外,还需要重点关注下几个重要的属性和函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3; // 29

// 线程容量(2^29-1=536 870 911)
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 000 1111111111111111111111111111

/*
* RUNNING 定义为 -1,
* SHUTDOWN 定义为 0,
* 其他的都比 0 大,
* 所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断
*/

// runState存储在高3位
// 接收新任务,处理队列任务
private static final int RUNNING = -1 << COUNT_BITS; // 111 00000000000000000000000000000 (-536870912)
// 不接受新的任务提交,但是会继续处理等待队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000000000000000000000000000 (0)
// 不接收新任务,也不处理队列任务,并且中断所有处理中的任务
private static final int STOP = 1 << COUNT_BITS; // 001 00000000000000000000000000000 ( 268435456)
// 所有任务都被终结,有效线程为0。会触发terminated()方法
private static final int TIDYING = 2 << COUNT_BITS; // 010 00000000000000000000000000000 (1073741824)
// 当terminated()方法执行结束时状态
private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000000000000000000000000000 (1610612736)

// Packing and unpacking ctl
private static int runStateOf(int c) {
return c & ~CAPACITY; // 取高三位,状态
}

private static int workerCountOf(int c) {
return c & CAPACITY; // 取低29位,线程数
}

private static int ctlOf(int rs, int wc) {
return rs | wc;
}

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/

private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* 线程数自增 1
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

/**
* 线程数自减 1
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

/**
* 只有当某线程被突然终止时才会调用该方法,其他线程数自减是在执行新的task时
* <p>
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}

看了这几种状态的介绍,读者大体也可以猜到十之八九的状态转换了,各个状态的转换过程有以下几种:

1
2
3
4
5
6
7
8
9
RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的

(RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚 shutDown() 和 shutDownNow() 的区别了

SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING

STOP -> TIDYING:当任务队列清空后,发生这个转换

TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后

上面的几个记住核心的就可以了,尤其第一个和第二个。

另外,我们还要看看一个内部类 Worker,因为 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部叫 task 或 command),线程是 Worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 继承了AbstractQueuedSynchronizer以简化获取和释放围绕每个任务执行的锁。
* 这可以防止中断旨在唤醒等待任务的工作线程,而不是中断正在运行的任务。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {

private static final long serialVersionUID = 6138294804551838833L;

/**
* 由ThreadFactory创建的真实执行任务的线程
*/
final Thread thread;

/**
* 前面说了,这里的 Runnable 是任务。
* 为什么叫 firstTask?因为在创建线程的时候,如果同时指定了这个线程起来以后需要执行的第一个任务,
* 那么第一个任务就是存放在这里的(线程可不止执行这一个任务)
* 当然了,也可以为 null,这样线程起来了,自己到任务队列(BlockingQueue)中取任务(getTask 方法)
*/
Runnable firstTask;

/**
* 用于存放此线程完全的任务数,注意了,这里用了 volatile,保证可见性
*/
volatile long completedTasks;

/**
* Worker 只有这一个构造方法,传入 firstTask,也可以传 null
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 调用 ThreadFactory 来创建一个新的线程
this.thread = getThreadFactory().newThread(this);
}

/**
* 这里调用了外部类的 runWorker 方法
*/
public void run() {
runWorker(this);
}

...// 其他几个方法没什么好看的,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
}

excute()

有了上面的这些基础后,我们终于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源码分析的时候也说了,各种方法都最终依赖于 execute 方法. 首先分析下execute主要工作流程:

  • (1)任务submit后先通过newTaskFor()封装成可返回结果的FutureTask;
  • (2)调用execute方法执行;
  • (3)execute方法在当前线程数(WC)小于coreSize时,直接创建新线程处理;
  • (4)如果创建新线程失败,尝试加入任务队列,若此时线程池已经处于非Running状态,则不做处理;
  • (5)成功加入任务队列后需要再次确认线程池状态(有可能在加入队列操作的过程中,线程池被shutdown了),如果此时线程池非Running,则移除该任务,执行拒绝策略;如果状态正常,则判断WC==0,如果等于0说明线程池中没有线程了,则创建一个新线程添加到pool中;
  • (6)如果加入队列失败或者当前状态非Running, 则尝试创建新线程来处理该任务,如果失败,则执行拒绝策略;

具体流程如下图:

本来想用一张图表示整个流程,结果发现图还没有看源代码清晰,干脆放弃了,直接看代码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
/*
* 三步走:
*
* 1. 当前线程数小于corePoolSize,添加一个新的worker,并把commond作为其第一个任务。
* 调用addWorker()方法会自动检查runState和workerCount,避免因为状态问题报错
*
* 2. 任务成功加入队列后,仍然需要再次确认是否增加新的工作线程(有可能在上次检测运行线程数之后某些线程挂了),
* 或者在进入这个方法时,线程池shut down了。
* So we recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. 如果无法添加到队列,则尝试创建新线程。如果失败,则表示线程池shutdown了或者需要执行拒绝策略了。
*
* 由此可见:在线程数超过corePoolSize后,只有队列满了才会再次创建新线程
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
// 至于执行的结果,到时候会包装到 FutureTask 中。
// 返回 false 代表线程池不允许提交任务
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
// 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 如果任务进入了 workQueue,我们是否需要开启新的线程
* 因为线程数在 [0, corePoolSize) 是无条件开启新的线程
* 如果线程数已经大于等于 corePoolSize,那么将任务添加到队列中,然后进到这里
*/
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) { // 如果线程池已不处于RUNNING状态,那么移除已经入队的这个任务
reject(command); // 执行拒绝策略
} else if (workerCountOf(recheck) == 0) {
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
// 这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
addWorker(null, false);
}
} else if (!addWorker(command, false)) { // 线程池非Running或者队列满了,尝试创建新线程
// 创建新线程失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
reject(command);
}
}

addWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 简单分析:
* 还是状态控制的问题,当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行
* 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务
* 多说一句:
* 如果线程池处于 SHUTDOWN,但是firstTask为null,且 workQueue 非空,那么是允许创建 worker 的
*
* @param firstTask 准备提交给这个线程执行的第一个任务,可以为null.
* @param core true 代表使用核心线程数 corePoolSize 作为创建线程的界线,也就说创建这个线程的时候,
* 如果线程池中的线程总数已经达到 corePoolSize,那么不能响应这次创建线程的请求
* 如果是 false,代表使用最大线程数 maximumPoolSize 作为界线
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c); // 获取当前状态
// 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
// 1. 线程池状态大于SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask不为空
// 3. 任务队列为空
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty())) {
return false;
}
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) { // 超容量了或者超过当前限制了,不允许创建
return false;
}
// 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
// 这里失败的话,说明有其他线程也在尝试往线程池中创建线程
if (compareAndIncrementWorkerCount(c)) {
break retry; // 退出循环,准备创建线程执行任务
}
c = ctl.get(); // 由于有并发,重新再读取一下 ctl
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
if (runStateOf(c) != rs) {
continue retry;
}
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
* 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
* 因为该校验的都校验了,至于以后会发生什么,那是以后的事,至少当前是满足条件的
*/
boolean workerStarted = false; // worker 是否已经启动
boolean workerAdded = false; // 是否已将这个 worker 添加到 workers 这个 HashSet 中
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这个是整个类的全局锁,持有这个锁才能让下面的操作“顺理成章”,
// 因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 小于 SHUTTDOWN 那就是 RUNNING,这个自不必说,是最正常的情况
// 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 检测线程是否已经是start状态
{
// 新添加的worker里面的 thread 可不能是已经启动的
throw new IllegalThreadStateException();
}
workers.add(w); // 加到 workers 这个 HashSet 中
int s = workers.size();
// largestPoolSize 用于记录 workers 中的个数的历史最大值
// 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 添加成功的话,启动这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) { // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
addWorkerFailed(w);
}
}
// 返回线程是否启动成功
return workerStarted;
}

addWorkFailed()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 线程创建失败回滚
* workers 中删除掉相应的 worker
* workCount 减 1
* 终止检查,防止这个线程的存在阻碍了线程池的terminate
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null) {
workers.remove(w);
}
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

runWorker()

回过头来,继续往下走。我们知道,worker 中的线程 start 后,其 run 方法会调用 runWorker 方法:

1
2
3
4
// Worker 类的 run() 方法
public void run() {
runWorker(this);
}

继续往下看 runWorker 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 实际执行task, 循环从队列中取任务执行
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调用 getTask 获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
// 这是一个钩子方法,留给需要的子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到这里终于可以执行任务了
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
// 这里不允许抛出 Throwable,所以转换为 Erro
thrown = x;
throw new Error(x);
} finally {
// 也是一个钩子方法,将 task 和异常作为参数,留给需要的子类实现
afterExecute(task, thrown);
}
} finally {
// 置空 task,准备 getTask 获取下一个任务
task = null;
// 累加该worker完成的任务数
w.completedTasks++;
// 释放掉 worker 的独占锁
w.unlock();
}
}
completedAbruptly = false; // 执行到这儿说明是getTask()为空,而不是报异常了
} finally {
// 如果到这里,需要执行线程关闭:
// 1. 说明 getTask 返回 null,也就是说,这个 worker 的使命结束了,执行关闭
// 2. 任务执行过程中发生了异常
// 第一种情况,已经在代码处理了将 workCount 减 1,这个在 getTask 方法分析中会说
// 第二种情况,workCount 没有进行处理,所以需要在 processWorkerExit 中处理
processWorkerExit(w, completedAbruptly);
}
}

processWorkerExit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 线程终止后:
* 1. 如果是异常退出, 则需要减掉当前workercCount
* 2. 更新线程池完成任务数
* 3. 从workers中移除终止的线程;
* 4. 终止检测
* 5. 如果线程池当前处于RUNNING/SHUTDOWN状态:
* a) 允许回收核心线程时,至少要保证有一个worker线程;
* b) 不允许回收核心线程时,当前线程小于corePoolSize,则创建新的线程;
* c)如果worker线程是由于异常退出,则直接创建一个新的worker线程
*
* @param w the worker
* @param completedAbruptly true woker执行异常
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
{
decrementWorkerCount();
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 终止检查,防止这个线程的存在阻碍了线程池的terminate
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // RUNNING/SHUTDOWN
if (!completedAbruptly) { // 说明当前任务队列中没有任务
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty()) { // 允许回收核心线程
min = 1;
}
if (workerCountOf(c) >= min) { // 当前线程数大于1 或者 corePoolSize, 暂时不创建新的线程
return; // replacement not needed
}
}
addWorker(null, false); // 添加新的备用线程
}
}

getTask()

getTask() 是怎么获取任务的,这个方法写得真的很好,每一行都很简单,组合起来却所有的情况都想好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 此方法有三种可能:
* 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,它们会一直等待任务
* 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
* 3. 如果发生了以下条件,此方法必须返回 null:
* - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
* - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
* - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 两种可能
// 1. rs == SHUTDOWN && workQueue.isEmpty()
// 2. rs >= STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 减少工作线程数, processWorkerExit()方法中会将该线程移除
return null;
}
int wc = workerCountOf(c);
// 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数超过maximumPoolSize
// 允许回收核心线程或者当前线程超过corePoolSize && 超时
// wc > 1 或者 队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) { // 减掉线程数
return null; // 获取任务的worker取不到任务就会退出
}
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 等待一定时间,如果仍然获取不到说明线程数过多,任务不够
workQueue.take(); // 阻塞获取
if (r != null) {
return r;
}
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 发生了中断,采取的方案是重试
// 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
// 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
// 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
timedOut = false;
}
}
}

tryTerminate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 把线程池状态设置为TERMINATED,在以下条件之一:
* 1. SHUTDOWN 并且 pool线程和队列都为空;
* 2. STOP 并且 pool线程为空;
* <p>
* 如果线程不为0时想要优雅终止,则中断空闲的worker线程以保证shutdown信号得到传播。
*
* This method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
// 状态为RUNNING、SHUTDOWN、STOP 或者 (SHUTDOWN && 队列不为空) 不允许
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
if (workerCountOf(c) != 0) { // 终止一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 当前线程数为0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 设置状态为TIDYING
try {
terminated(); // 触发终止后方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 最终设置为TERMINATED状态
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

拒绝策略

ThreadPoolExecutor 中的拒绝策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 此处的 handler 我们需要在构造线程池的时候就传入这个参数,它是 RejectedExecutionHandler 的实例。
* RejectedExecutionHandler 在 ThreadPoolExecutor 中有四个已经定义好的实现类可供我们直接使用,
* 当然,我们也可以实现自己的策略,不过一般也没有必要。简要介绍下四中默认的拒绝策略:
* <p>
* 1. CallerRunsPolicy: 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务
* <p>
* 2. AbortPolicy:不管怎样,直接抛出 RejectedExecutionException 异常, 这个是默认的策略,
* 如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
* <p>
* 3. DiscardPolicy:不做任何处理,直接忽略掉这个任务
* <p>
* 4. DiscardOldestPolicy: 这个相对霸道一点,如果线程池没有被关闭的话, 把队列队头的任务(也就是等待了最长时间的)直接扔掉,
* 然后提交这个任务到等待队列中
*/
final void reject(Runnable command) {
// 执行拒绝策略
handler.rejectedExecution(command, this);
}

Executors

Executors它仅仅是工具类,它的所有方法都是 static 的。

FixedThreadPoole

生成一个固定大小的线程池:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0 也不会执行 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。

过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

SingleThreadExecutor

生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

过程分析:鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中,否则就是和第一个任务一样,进到最后的 else if 分支。

这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。

SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,可以看我的另一篇关于 BlockingQueue 的文章。

代码

带注释完整代码: https://github.com/austin-brant/thread-pool-source-code