Executor框架使用与分析一
介绍Executor框架。
基本框架介绍
顶层接口Executor,ExecutorService继承Executor,AbstractExecutorService继承ExecutorService,ThreadPoolExecutor继承抽象ExecutorService。还有ScheduledThreadPoolExecutor。
Java API还提供的工具类Executors,帮我们创建不同的Executor实现类,针对不同的应用场景。
- FixedThreadPool
- SingleThreadExecutor
- newCachedThreadPool
- newScheduledThreadPool
- newWorkStealingPool(1.8新加的)
我们将分析这几种情况,以及相关源码分析。
Executor1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public abstract class AbstractExecutorService implements ExecutorService{
...
}
public class ThreadPoolExecutor extends AbstractExecutorService {
...
}
样例
任务
1 | public class Task implements Runnable { |
Executor
1 | public class Server { |
测试类
1 | public class Main { |
源码分析
根据源码,代码分为三步,
- initial
- executeTask
- shutDown
创建ThreadPool
这里使用的是CachedThreadPool,corePoolSize为0,max size为Integer最大值。线程存好时间是60秒。队列使用SynchronousQueue。
Executors.java1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ThreadPoolExecutor.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
//默认线程工厂,handler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
执行任务
ThreadPoolExecutor.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//这里,计算worker count,初始状态是0,但是corePoolSize也是为0,不执行里面代码
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//初始状态isRunning返回true, offer返回false,对于该样例,不会这里的代码
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//创建Worker,并启动Worker
else if (!addWorker(command, false))
reject(command);
}
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
现在进入Worker执行流程,
ThreadPoolExecutor.Worker1
2
3
4
5
6
7
8
9
10
11Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//这里回调ThreadPoolExecutor中的runWorker方法
public void run() {
runWorker(this);
}
1 | final void runWorker(Worker w) { |
来看看如果worker执行完后的处理,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private void processWorkerExit(Worker w, boolean completedAbruptly) {
//正常情况是false,不执行
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//这里ctl自减1,直到成功
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//记录完成的任务的个数
completedTaskCount += w.completedTasks;
//这里清除掉worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//这里min等于0,
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//workQueue一直返回true
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//执行这里的代码
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
到这里,CachedThreadPool的执行过程,分析了一篇,来一个task,就启动一个Worker来执行。
关闭
shutdown方法不会阻止已经运行的任务,但是也不接受新的任务了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置成shutdown状态
advanceRunState(SHUTDOWN);
//中断worker线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
总结
这是第一篇分析Executor框架,目前只简单涉及CachedThreadPool的使用。嗯,看看SynchronousQueue这个队列。