概述
线程池是管理一组同构工作线程的资源池。合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
Java中线程池的基础架构
线程池的使用及原理分析
由于在JDK中内部提供的许多线程池都是基于ThreadPoolExecutor的,因此下面先介绍ThreadPoolExecutor的相关原理。
ThreadPoolExecutor
使用ThreadPoolExecutor
Java中可以通过ThreadPoolExecutor来创建一个线程池:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
参数介绍:
- corePoolSize:核心线程数量。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
- keepAliveTime:超出corePoolSize后创建的线程的存活时间。
- unit:keepAliveTime的时间单位。
- workQueue:任务队列,用于保存待执行的任务。
- threadFactory:线程池内部创建线程所用的工厂。
- handler:任务无法执行时的处理器。
使用ThreadPoolExecutor+单例模式实现一个简单的线程池:
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPool {
private final String THREAD_TAG = this.getClass().getSimpleName();
/**
* 定义ThreadPoolExecutor需要的必要参数
*/
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final int MAX_POOL_SIZE = 64;
private static final int KEEP_ALIVE_TIME = 1;
/**
* 工作队列,使用LinkedBlockingQueue
*/
private final BlockingQueue<Runnable> mWorkQueue = new LinkedBlockingQueue<>();
/**
* 定义创建线程的工厂
*/
private final ThreadFactory mThreadFactory = new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, THREAD_TAG + "#" + threadCount.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
/**
* 定义拒绝执行的处理器
*/
private final RejectedExecutionHandler mRejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
}
};
/**
* 自定义线程池
*/
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, mWorkQueue, mThreadFactory, mRejectedExecutionHandler);
private MyThreadPool() {
}
private static volatile MyThreadPool instance;
/**
* 使用DCL实现单例模式
* @return MyThreadPool实例
*/
public static MyThreadPool getInstance() {
if (instance == null) {
synchronized (MyThreadPool.class) {
if (instance == null) {
instance = new MyThreadPool();
}
}
}
return instance;
}
/**
* 提交任务
* @param command Task
*/
public void submit(Runnable command) {
executor.execute(command);
}
/**
* 关闭线程池
*/
public List<Runnable> shutDownNow() {
return executor.shutdownNow();
}
}
ThreadPoolExecutor源码分析
线程池的生命周期
在ThreadPoolExecutor中,有一个ctl字段:
ctl字段通过高低位的不同,高3位表示线程池状态,低29位工作线程数目,这是一个典型的高效优化。在实际编写代码的过程中,我们可以指定最大的线程数(注意区分工作线程数)为Integer.MAX_VALUE,但由于资源的限制,这仅仅是理论上的值,因此完全可以将多余的bit赋予其他意义。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS:存储工作线程数的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态,保存在ctl的高位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
在讨论线程池的生命周期时,要注意和进程、线程的生命周期区分开来。
进程的生命周期可以参考《操作系统》教材,包括三状态、五状态及七状态三种解释角度。
线程的五种状态:新建New、就绪Runnable、运行Running、阻塞Blocked、死亡Dead。
线程池的五种状态:Running、ShutDown、Stop、Tidying、Terminated。
源码分析
1、execute方法分析:
接下来分析ThreadPoolExecutor中常用的execute方法的执行流程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 检查工作线程数目,如果低于corePoolSizez则添加Worker
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning()检查线程池是否被shutdown
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次进行防御性检查
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试addWorker,若失败则认为线程池被shutdown或饱和了
else if (!addWorker(command, false))
reject(command);
}
上述代码分成了三部分进行:
- 工作线程不超过corePoolSize时,创建一个新的Worker。其中addWorker()方法中会开启线程执行任务,稍后会对addWorker()方法进行详细分析。
- 如果工作线程超过了corePoolSize,则会检查线程池状态:线程池为running态,将任务添加到阻塞队列,否则进入第三步。
- 尝试添加一个Worker,如果失败,则使用构造函数传入的RejectedExecutionHandler拒绝任务,默认采用AbortPolicy。
execute(runnable)方法流程图:
2、addWorker方法分析
分析完execute()方法的源码之后,发现addWorker是其中的一个核心的方法,进而分析addWorker()源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从源码中可以看出addWorker大致的工作内容:检查线程池的状态和运行线程数,然后创建一个Worker对象,将任务command传入Worker,然后将创建的worker添加到workers中,其中workers是保存了所有工作线程的HashSet。最后启动worker对象。
流程图:
详细细节请阅读ThreadPoolExecutor相关方法的源码。
3、分析Worker
分析完上述代码后,可以对ThreadPoolExecutor的execute方法有了一个初步的了解,接下来就是对Worker的源码进行分析,通过对Worker的源码进行分析,可以更加深入的了解ThreadPoolExecutor的实现原理。
Worker是ThreadPoolExecutor中的一个内部类,继承了AQS(对于AQS的分析将会在另一篇文章中介绍),实现了Runnable接口,以下是Worker的源码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
首先看Worker的构造方法,传入了一个Runnable参数,这个参数就是我们使用execute方法提交的任务,其构造方法将Runnable参数赋给其成员变量,然后使用**getThreadFactory().newThread(this);**创建了一个线程,同样将引用赋给其成员变量。getThreadFactory方法属于ThreadPoolExecutor成员域中,该方法就是返回了在ThreadPoolExecutor构造函数中传入的自定义的线程创建工厂,ThreadPoolExecutor中也有默认的实现,具体可以在ThreadPoolExecutor的构造函数中找到默认实现的位置。
以上就是Worker的构造方法,需要注意的是,在实例化Worker的thread成员时,没有把Runnable也就是我们提交的任务传入,而是传入了this(Worker实现了Runnable)。反过来看addWorker的源码,在启动Worker时调用了Worker内部的thread的start方法,也就是说,工作线程并没有直接执行我们提交的任务,而是执行Worker的run方法,Worker的run方法如下:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
4、runWorker(Runnable r)
Worker类中的run方法调用了runWorker方法,传入了this指针。runWorkers是ThreadPoolExecutor域中的一个final方法,其源码:
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在runWorker中,使用**Runnable task = w.firstTask;**获取我们传入的任务的引用,然后将Worker的firstTask置为null,这一步需要注意,原因会在稍后讲解。
然后执行一个while循环,循环的执行条件是task不为null或getTask()获取的task不为null,查看getTask()源码,发现其会从阻塞队列中取出一个任务,这就可以理解为什么execute方法中会有addWorker(null, false)这样的代码了,任务参数为null,runWorker方法会从阻塞队列中获取任务执行。
进入while循环后,忽略其他细节,可以看到调用了task的run()方法,在这里我们提交的任务就开始执行了。
任务执行完毕后,在while循环中的try块的finally中,将task赋值为null,再次循环到while时就会从阻塞队列取任务执行了。在最后不满足while条件后调用了processWorkerExit方法然后退出。
5、processWorkerExit(Worker w, boolean completedAbruptly)
在while循环退出的时候,意味着阻塞队列中没有任务了,也就是说当前线程是一个空闲进程,接下来分析一下processWorkerExit方法看看线程池会怎么处理当前线程。
processWorkerExit源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累计完成的任务数量
completedTaskCount += w.completedTasks;
// 移除当前worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
在上述代码中,首先调用了decrementWorkerCount()将Worker数(工作线程数)减一,在try块中首先使用了ThreadPoolExecutor的成员变量completedTaskCount累计了当前worker完成的任务数量,然后从workers工作线程集合中移除了当前worker。
然后再下面的if语句中,判断了是否允许核心线程超时,如果允许则会判断阻塞队列是否为空,若阻塞队列为空,则当前线程直接返回,当前线程结束运行;如果阻塞队列不为空,则当工作线程数大于1时当前线程返回并结束运行,否则继续执行。如果不允许核心线程超时,则会判断当前工作线程数是否大于corePoolSize,如果大于corePoolSize则结束当前线程,否则继续执行。
如果程序继续执行,则会调用addWorker(null, false),这里在之前分析runWorker方法时讲解过了,该方法会创建一个线程试图执行阻塞队列中的任务,若阻塞队列为空,则又会执行到当前方法。
总结分析
从以上源码可以看出线程池的基本工作原理,使用execute方法提交一个任务,如果工作线程数没达到corePoolSize,则会创建一个新的线程执行任务,并且在当前任务执行完毕后会从阻塞队列中取任务执行,直到阻塞队列为空。最后如果工作线程数小于corePoolSize,当前线程会创建一个新的task为null的Worker,然后当前线程退出。
如果工作线程数大于等于corePoolSize,则会根据情况将任务添加至阻塞队列中。(细节请见源码)
了解完ThreadPoolExecutor的基本原理,接下来看看JDK提供的线程池及其使用场景。
Executors
JDK中的Executors类中目前提供了5种线程池创建配置。
newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool中,corePoolSize大小为0,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为60s,workQueue为SynchronousQueue,而SynchronousQueue作用是传递任务,不会保存任务,因此该线程池的工作流程如下:
- 没有核心线程,直接向SynchronousQueue提交任务
- 如果有空闲的线程,则取任务执行,否则创建新的线程
- 执行完成后的线程有60s超时时间,如果在60s内没有从队列中取得新的任务,则会被销毁
CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器。
newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool中,corePoolSize与maximumPoolSize都为nThreads,也就是线程池中线程数超过nThreads后,新提交的任务会被放到这个LinkedBlockingQueue阻塞队列中,而LinkedBlockingQueue默认容量为Integer.MAX_VALUE,并且超时时间为0,因此FixThreadPool的工作流程如下:
- 线程数小于nThreads时,提交新的任务会创建新的线程
- 线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行
ThreadPoolExecutor适用于负载较重的服务器,为了合理利用资源,需要限制线程数量。
newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor中,corePoolSize与maximumPoolSize均为1,超时时间为0ms,workQueue为LinkedBlockingQueue,相当于nThreads为1的FixedThreadPool。其工作流程如下:
- 线程池中没有线程时,新建一个线程执行任务
- 线程池有一个线程后,新提交的任务加入LinkedBlockingQueue
- 线程池中的这一个线程执行完毕后到队列中取新的任务执行,如果workQueue为空,则线程销毁
SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行。
newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。所以从代码中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。
实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作。
newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
在newSingleThreadScheduledExecutor方法中,返回了一个DelegatedScheduledExecutorService实例。DelegatedScheduledExecutorService是Executors中的一个静态内部类,实现代码如下:
/**
* A wrapper class that exposes only the ScheduledExecutorService
* methods of a ScheduledExecutorService implementation.
*/
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
DelegatedScheduledExecutorService继承了DelegatedExecutorService类,实现了ScheduledExecutorService接口,并且其构造函数传入了一个ScheduledExecutorService实例,实现ScheduledExecutorService中的方法也是直接调用的传入实例的方法。实际上DelegatedScheduledExecutorService可以看做是一个ScheduledExecutorService的包装类,具体原理上面也已经分析过了,就不再赘述了。
newWorkStealingPool(int parallelism)
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
可以看出来WorkStealingPool内部返回了一个ForkJoinPool实例,关于ForkJoinPool相关的描述可以查看博客http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/。实际上WorkStealingPool内部即ForkJoinPool实现了work stealing算法,该算法在Doug Lea的论文中有详细的描述。该线程池主要用于实现分治算法,适合执行密集计算型任务。
总结
本文介绍了Java中的线程池,介绍了线程池的生命周期、ThreadPoolExecutor的使用以及其源码分析、JDK在Executors中提供的几种线程池的基本原理和用法等内容,实际应用中个人觉得最好不用Executors中的线程池,而是应该结合具体业务情况自定义线程池,Executors中的几种线程池可以看做自定义线程池的几个方向,最重要的还是要理解各个参数的作用以及线程池的原理,这样针对具体的业务才能定义出合适的线程池。