概述

线程池是管理一组同构工作线程的资源池。合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

Java中线程池的基础架构

thread-pool-class

线程池的使用及原理分析

由于在JDK中内部提供的许多线程池都是基于ThreadPoolExecutor的,因此下面先介绍ThreadPoolExecutor的相关原理。

ThreadPoolExecutor

使用ThreadPoolExecutor

Java中可以通过ThreadPoolExecutor来创建一个线程池:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);

参数介绍:

  • corePoolSize:核心线程数量。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:超出corePoolSize后创建的线程的存活时间。
  • unit:keepAliveTime的时间单位。
  • workQueue:任务队列,用于保存待执行的任务。
  • threadFactory:线程池内部创建线程所用的工厂。
  • handler:任务无法执行时的处理器。

使用ThreadPoolExecutor+单例模式实现一个简单的线程池:

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadPool {

    private final String THREAD_TAG = this.getClass().getSimpleName();

    /**
     * 定义ThreadPoolExecutor需要的必要参数
     */
    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private static final int MAX_POOL_SIZE = 64;
    private static final int KEEP_ALIVE_TIME = 1;

    /**
     * 工作队列,使用LinkedBlockingQueue
     */
    private final BlockingQueue<Runnable> mWorkQueue = new LinkedBlockingQueue<>();

    /**
     * 定义创建线程的工厂
     */
    private final ThreadFactory mThreadFactory = new ThreadFactory() {
        private final AtomicInteger threadCount = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, THREAD_TAG + "#" + threadCount.getAndIncrement());
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    };

    /**
     * 定义拒绝执行的处理器
     */
    private final RejectedExecutionHandler mRejectedExecutionHandler = new RejectedExecutionHandler() {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
        }
    };

    /**
     * 自定义线程池
     */
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
            MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, mWorkQueue, mThreadFactory, mRejectedExecutionHandler);

    private MyThreadPool() {
    }

    private static volatile MyThreadPool instance;

    /**
     * 使用DCL实现单例模式
     * @return MyThreadPool实例
     */
    public static MyThreadPool getInstance() {
        if (instance == null) {
            synchronized (MyThreadPool.class) {
                if (instance == null) {
                    instance = new MyThreadPool();
                }
            }
        }
        return instance;
    }

    /**
     * 提交任务
     * @param command Task
     */
    public void submit(Runnable command) {
        executor.execute(command);
    }

    /**
     * 关闭线程池
     */
    public List<Runnable> shutDownNow() {
        return executor.shutdownNow();
    }
}

ThreadPoolExecutor源码分析

线程池的生命周期

在ThreadPoolExecutor中,有一个ctl字段:

ctl字段通过高低位的不同,高3位表示线程池状态,低29位工作线程数目,这是一个典型的高效优化。在实际编写代码的过程中,我们可以指定最大的线程数(注意区分工作线程数)为Integer.MAX_VALUE,但由于资源的限制,这仅仅是理论上的值,因此完全可以将多余的bit赋予其他意义。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS:存储工作线程数的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态,保存在ctl的高位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

在讨论线程池的生命周期时,要注意和进程、线程的生命周期区分开来。

进程的生命周期可以参考《操作系统》教材,包括三状态、五状态及七状态三种解释角度。

Alt text Alt text Alt text

线程的五种状态:新建New、就绪Runnable、运行Running、阻塞Blocked、死亡Dead。

Alt text

线程池的五种状态:Running、ShutDown、Stop、Tidying、Terminated。

Alt text

源码分析

1、execute方法分析:

接下来分析ThreadPoolExecutor中常用的execute方法的执行流程:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */

        // 检查工作线程数目,如果低于corePoolSizez则添加Worker
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // isRunning()检查线程池是否被shutdown
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次进行防御性检查
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // 尝试addWorker,若失败则认为线程池被shutdown或饱和了
    else if (!addWorker(command, false))
        reject(command);
}

上述代码分成了三部分进行:

  1. 工作线程不超过corePoolSize时,创建一个新的Worker。其中addWorker()方法中会开启线程执行任务,稍后会对addWorker()方法进行详细分析。
  2. 如果工作线程超过了corePoolSize,则会检查线程池状态:线程池为running态,将任务添加到阻塞队列,否则进入第三步。
  3. 尝试添加一个Worker,如果失败,则使用构造函数传入的RejectedExecutionHandler拒绝任务,默认采用AbortPolicy。

execute(runnable)方法流程图:

Alt text

2、addWorker方法分析

分析完execute()方法的源码之后,发现addWorker是其中的一个核心的方法,进而分析addWorker()源码:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

从源码中可以看出addWorker大致的工作内容:检查线程池的状态和运行线程数,然后创建一个Worker对象,将任务command传入Worker,然后将创建的worker添加到workers中,其中workers是保存了所有工作线程的HashSet。最后启动worker对象。

流程图:

Alt text

详细细节请阅读ThreadPoolExecutor相关方法的源码。

3、分析Worker

分析完上述代码后,可以对ThreadPoolExecutor的execute方法有了一个初步的了解,接下来就是对Worker的源码进行分析,通过对Worker的源码进行分析,可以更加深入的了解ThreadPoolExecutor的实现原理。

Worker是ThreadPoolExecutor中的一个内部类,继承了AQS(对于AQS的分析将会在另一篇文章中介绍),实现了Runnable接口,以下是Worker的源码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
        * This class will never be serialized, but we provide a
        * serialVersionUID to suppress a javac warning.
        */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
        * Creates with given first task and thread from ThreadFactory.
        * @param firstTask the first task (null if none)
        */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

首先看Worker的构造方法,传入了一个Runnable参数,这个参数就是我们使用execute方法提交的任务,其构造方法将Runnable参数赋给其成员变量,然后使用**getThreadFactory().newThread(this);**创建了一个线程,同样将引用赋给其成员变量。getThreadFactory方法属于ThreadPoolExecutor成员域中,该方法就是返回了在ThreadPoolExecutor构造函数中传入的自定义的线程创建工厂,ThreadPoolExecutor中也有默认的实现,具体可以在ThreadPoolExecutor的构造函数中找到默认实现的位置。

以上就是Worker的构造方法,需要注意的是,在实例化Worker的thread成员时,没有把Runnable也就是我们提交的任务传入,而是传入了this(Worker实现了Runnable)。反过来看addWorker的源码,在启动Worker时调用了Worker内部的thread的start方法,也就是说,工作线程并没有直接执行我们提交的任务,而是执行Worker的run方法,Worker的run方法如下:

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

4、runWorker(Runnable r)

Worker类中的run方法调用了runWorker方法,传入了this指针。runWorkers是ThreadPoolExecutor域中的一个final方法,其源码:

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

在runWorker中,使用**Runnable task = w.firstTask;**获取我们传入的任务的引用,然后将Worker的firstTask置为null,这一步需要注意,原因会在稍后讲解。

然后执行一个while循环,循环的执行条件是task不为null或getTask()获取的task不为null,查看getTask()源码,发现其会从阻塞队列中取出一个任务,这就可以理解为什么execute方法中会有addWorker(null, false)这样的代码了,任务参数为null,runWorker方法会从阻塞队列中获取任务执行。

进入while循环后,忽略其他细节,可以看到调用了task的run()方法,在这里我们提交的任务就开始执行了。

任务执行完毕后,在while循环中的try块的finally中,将task赋值为null,再次循环到while时就会从阻塞队列取任务执行了。在最后不满足while条件后调用了processWorkerExit方法然后退出。

5、processWorkerExit(Worker w, boolean completedAbruptly)

在while循环退出的时候,意味着阻塞队列中没有任务了,也就是说当前线程是一个空闲进程,接下来分析一下processWorkerExit方法看看线程池会怎么处理当前线程。

processWorkerExit源码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 累计完成的任务数量
        completedTaskCount += w.completedTasks;

        // 移除当前worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

在上述代码中,首先调用了decrementWorkerCount()将Worker数(工作线程数)减一,在try块中首先使用了ThreadPoolExecutor的成员变量completedTaskCount累计了当前worker完成的任务数量,然后从workers工作线程集合中移除了当前worker。

然后再下面的if语句中,判断了是否允许核心线程超时,如果允许则会判断阻塞队列是否为空,若阻塞队列为空,则当前线程直接返回,当前线程结束运行;如果阻塞队列不为空,则当工作线程数大于1时当前线程返回并结束运行,否则继续执行。如果不允许核心线程超时,则会判断当前工作线程数是否大于corePoolSize,如果大于corePoolSize则结束当前线程,否则继续执行。

如果程序继续执行,则会调用addWorker(null, false),这里在之前分析runWorker方法时讲解过了,该方法会创建一个线程试图执行阻塞队列中的任务,若阻塞队列为空,则又会执行到当前方法。

Alt text

总结分析

从以上源码可以看出线程池的基本工作原理,使用execute方法提交一个任务,如果工作线程数没达到corePoolSize,则会创建一个新的线程执行任务,并且在当前任务执行完毕后会从阻塞队列中取任务执行,直到阻塞队列为空。最后如果工作线程数小于corePoolSize,当前线程会创建一个新的task为null的Worker,然后当前线程退出。

如果工作线程数大于等于corePoolSize,则会根据情况将任务添加至阻塞队列中。(细节请见源码)

了解完ThreadPoolExecutor的基本原理,接下来看看JDK提供的线程池及其使用场景。

Executors

JDK中的Executors类中目前提供了5种线程池创建配置。

newCachedThreadPool()

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

CachedThreadPool中,corePoolSize大小为0,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为60s,workQueue为SynchronousQueue,而SynchronousQueue作用是传递任务,不会保存任务,因此该线程池的工作流程如下:

  • 没有核心线程,直接向SynchronousQueue提交任务
  • 如果有空闲的线程,则取任务执行,否则创建新的线程
  • 执行完成后的线程有60s超时时间,如果在60s内没有从队列中取得新的任务,则会被销毁

CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器。

newFixedThreadPool(int nThreads)

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool中,corePoolSize与maximumPoolSize都为nThreads,也就是线程池中线程数超过nThreads后,新提交的任务会被放到这个LinkedBlockingQueue阻塞队列中,而LinkedBlockingQueue默认容量为Integer.MAX_VALUE,并且超时时间为0,因此FixThreadPool的工作流程如下:

  • 线程数小于nThreads时,提交新的任务会创建新的线程
  • 线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行

ThreadPoolExecutor适用于负载较重的服务器,为了合理利用资源,需要限制线程数量。

newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor中,corePoolSize与maximumPoolSize均为1,超时时间为0ms,workQueue为LinkedBlockingQueue,相当于nThreads为1的FixedThreadPool。其工作流程如下:

  • 线程池中没有线程时,新建一个线程执行任务
  • 线程池有一个线程后,新提交的任务加入LinkedBlockingQueue
  • 线程池中的这一个线程执行完毕后到队列中取新的任务执行,如果workQueue为空,则线程销毁

SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行。

newScheduledThreadPool(int corePoolSize)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。所以从代码中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。

实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作。

newSingleThreadScheduledExecutor()

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

在newSingleThreadScheduledExecutor方法中,返回了一个DelegatedScheduledExecutorService实例。DelegatedScheduledExecutorService是Executors中的一个静态内部类,实现代码如下:

/**
 * A wrapper class that exposes only the ScheduledExecutorService
 * methods of a ScheduledExecutorService implementation.
 */
static class DelegatedScheduledExecutorService
        extends DelegatedExecutorService
        implements ScheduledExecutorService {
    private final ScheduledExecutorService e;
    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        e = executor;
    }
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return e.schedule(command, delay, unit);
    }
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return e.schedule(callable, delay, unit);
    }
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return e.scheduleAtFixedRate(command, initialDelay, period, unit);
    }
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
}

DelegatedScheduledExecutorService继承了DelegatedExecutorService类,实现了ScheduledExecutorService接口,并且其构造函数传入了一个ScheduledExecutorService实例,实现ScheduledExecutorService中的方法也是直接调用的传入实例的方法。实际上DelegatedScheduledExecutorService可以看做是一个ScheduledExecutorService的包装类,具体原理上面也已经分析过了,就不再赘述了。

newWorkStealingPool(int parallelism)

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

可以看出来WorkStealingPool内部返回了一个ForkJoinPool实例,关于ForkJoinPool相关的描述可以查看博客http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/。实际上WorkStealingPool内部即ForkJoinPool实现了work stealing算法,该算法在Doug Lea的论文中有详细的描述。该线程池主要用于实现分治算法,适合执行密集计算型任务。

总结

本文介绍了Java中的线程池,介绍了线程池的生命周期、ThreadPoolExecutor的使用以及其源码分析、JDK在Executors中提供的几种线程池的基本原理和用法等内容,实际应用中个人觉得最好不用Executors中的线程池,而是应该结合具体业务情况自定义线程池,Executors中的几种线程池可以看做自定义线程池的几个方向,最重要的还是要理解各个参数的作用以及线程池的原理,这样针对具体的业务才能定义出合适的线程池。