菜单

Java并作的线程池ThreadPoolExecutor源码分析上

2018年11月16日 - Java

线程池学习

以下有所内容与源码分析都是基于JDK1.8的,请知悉。

自己写博客就着实比没有各个了,这也许与自身的就学方法有关,我要好为看这样不行不好的,但是没道说服自己去改变,所以呢只好这样想到什么法啊了。


池化技术确实是平等派系在我看来非常牛逼的技能,因为它完成了在点滴资源内实现了资源利用的最大化,这让自身想开了同一派课程,那即便是运筹学,当时在达到运筹学的上就是时召开这种接近的题材。


言归正传吧,我连下去会进展同样软线程池方面知识点的攻,也会见记录下来分享给大家。

线程池的内容中有涉及到AQS同步器的知识点,如果对AQS同步器知识点感觉有些薄弱,可以错过押自己之高达一致首文章。

1、线程池介绍

于web开发中,服务器需要经受并拍卖要,所以会否一个呼吁来分配一个线程来拓展拍卖。如果老是要都新创一个线程的讲话实现起来很便利,但是在一个题目:

假设出现的乞求数量大多,但每个线程执行的时非常缺,这样就是会见频之创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在呢每个请求创建新线程和销毁线程上消费的时日跟吃的系统资源要比拍卖实际的用户要的日子以及资源还多。

从而线程池就出现了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的支付为分派至了多只任务上。

使用线程池的利益:

Java中之线程池是为此ThreadPoolExecutor类来兑现的. 本文就结JDK
1.8对准该类的源码来分析一下斯类似里对此线程的创导,
管理以及后台任务的调度等方面的实行原理。

线程池的优势


既然说到线程池了,而且多数之大牛也都见面提议我们利用池化技术来治本一些资源,那线程池肯定也是起它的补益的,要不然怎么会那么出名并且被大家用也?

​ 我们尽管来看望它究竟有什么优势?

2、继承关系

咱第一来拘禁一下线程池的类图:

图片 1

线程池继承关系图

Executor接口

public interface Executor {
    /**
     * 在将来的某个时候执行传入的命令,执行命令可以在实现类里通过新创建的线程、线程池、当前线程来完成。
     */
    void execute(Runnable command);
}

ExecutorService接口

public interface ExecutorService extends Executor {

    /**
     * 启动先前提交的任务被执行的有序关闭,但不接受新的任务。 如果已经关闭,则调用没有其他影响。
     */
    void shutdown();

    /**
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回正在等待执行的任务的列表。
     * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
     * 从这个方法返回后,这些任务从任务队列中排出(移除)。 除了竭尽全力地停止处理主动执行任务之外,没有任何保证。 
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池有没有被关闭,关闭返回true,否则false
     */
    boolean isShutdown();

    /**
     * 如果所有任务在关闭后都完成了。返回true
     * 提示:如果没有在调用该方法前调用shutdown或者shutdownNow方法,此方法永远不会返回true
     */
    boolean isTerminated();

    /**
     * 在指定时间内阻塞等待任务全部完成,完成了返回true,否则false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个有返回值的任务
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个任务来执行,返回一个有返回值的结果,返回值为传入的result
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个任务来执行,返回一个有返回值的结果,返回值为null
     */
    Future<?> submit(Runnable task);

    /**
     * 执行一批有返回值的任务
     * 返回的结果调用{@link Future#isDone}都是true
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    /**
     * 执行给定的任务,当全部完成或者超时返回一个有状态和结果的Future集合。
     * 返回的结果调用{@link Future#isDone}都是true
     * 返回时,尚未完成的任务将被取消。
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,返回一个成功完成任务的结果(即,没有抛出异常),
     * 如果有的话。 在正常或异常返回时,尚未完成的任务将被取消。 
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 执行给定的任务,返回一个成功完成任务的结果(即,没有抛出异常),
     * 如果有的话。 在正常或异常返回时,尚未完成的任务将被取消。 
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     * 超时没有成功结果抛出TimeoutException
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService接口

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    // ...
}

线程池的始建


我们而用线程池来统一分配和管理我们的线程,那首先我们如果创一个线程池出来,还是产生许多大牛已经拉我们描绘好了众多点的代码的,Executors的厂子方法就是受咱提供了创多种不同线程池的方式。因为这个近乎就是一个创建对象的厂子,并没有关联到好多的切实落实,所以我未会见过分详细地失去验证。

​ 老规矩,还是一直上代码吧。

public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
  }

此地呢就算举出一个计的事例来拓展事后的教授吧,我们可以看,Executors只是个厂子而已,方法吧才是来实例化不同之目标,实际上实例化出来的主要类即是ThreadPoolExecutor。现在咱们虽先来概括地针对ThreadPoolExecutor构造函数内之每个参数进行解释一下吧。

线程池的履流程就因此参考资料里的希冀介绍一下了,具体我们要通过代码去上课。
图片 2

于方我们大概的上课了转Executors本条厂类里的工厂方法,并且讲述了瞬间创建线程池的有的参数和她的企图,当然者的教授并无是死深刻,因为想要弄懂的言语是索要持续地花时间去看去领悟的,而博主自己也还是尚未完全弄明白,不过博主的读方式是预先模拟了单大体,再回头来看望前面的知识点,可能会见愈好掌握,所以我们随后向下说吧。

3、ThreadPoolExecutor分析

怀念只要深深明ThreadPoolExecutor,就假设先行清楚其中最要害之几只参数:

ThreadPoolExecutor源码分析


以地方我们虽意识了,Executors的厂子方法要就是回去了ThreadPoolExecutor对象,至于其他一个每当这边小无开腔,也就是说,要学习线程池,其实重要的要么得学会分析ThreadPoolExecutor以此目标中的源码,我们对接下便会指向ThreadPoolExecutor里的基本点代码进行辨析。

3.1、核心变量和艺术(状态转换)

// 状态|工作数的一个32bit的值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 0001-1111-1111-1111-1111-1111-1111-1111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 1110-0000-0000-0000-0000-0000-0000-0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 0000-0000-0000-0000-0000-0000-0000-0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 0010-0000-0000-0000-0000-0000-0000-0000
private static final int STOP       =  1 << COUNT_BITS;
// 0100-0000-0000-0000-0000-0000-0000-0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 0110-0000-0000-0000-0000-0000-0000-0000
private static final int TERMINATED =  3 << COUNT_BITS;

// ~CAPACITY就是前3位状态位,和c进行&就能得到当前的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 和c进行&就能得到当前的工作数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// rs就是状态值,wc就是工作数,这两个进行或操作,就能得到ctl的值(32bit的值)
private static int ctlOf(int rs, int wc) { return rs | wc; }

可能过多总人口视地方的写法都蒙圈了。我实际基础呢未极端好,所以我望此的时刻索性写了单器类去测试他们的输出结果,如下:

public class ExecutorTest {
    private final static int COUNT_BITS = Integer.SIZE - 3;
    private final static int RUNNING    = -1 << COUNT_BITS;
    private final static int SHUTDOWN   =  0 << COUNT_BITS;
    private final static int STOP       =  1 << COUNT_BITS;
    private final static int TIDYING    =  2 << COUNT_BITS;
    private final static int TERMINATED =  3 << COUNT_BITS;
    private final static int CAPACITY   = (1 << COUNT_BITS) - 1;

    public static void main(String[] args) {
        System.out.println("状态位===");
        System.out.println(getFormatStr(RUNNING));
        System.out.println(getFormatStr(SHUTDOWN));
        System.out.println(getFormatStr(STOP));
        System.out.println(getFormatStr(TIDYING));
        System.out.println(getFormatStr(TERMINATED));
        System.out.println(getFormatStr(CAPACITY));
    }

    private static String getFormatStr(int n) {
        String integerMaxValueStr = Integer.toBinaryString(n);
        int a = 32;
        StringBuilder sb = new StringBuilder();
        int l = integerMaxValueStr.length();
        int i = 0;
        for (; a > 0; --a) {
            if (--l >= 0) {
                sb.append(integerMaxValueStr.charAt(l));
            } else {
                sb.append("0");
            }
            if (++i % 4 == 0) {
                if (a > 1) {
                    sb.append("-");
                }
                i = 0;
            }
        }
        return sb.reverse().toString();
    }
}

出口结果也:

状态位===
1110-0000-0000-0000-0000-0000-0000-0000
0000-0000-0000-0000-0000-0000-0000-0000
0010-0000-0000-0000-0000-0000-0000-0000
0100-0000-0000-0000-0000-0000-0000-0000
0110-0000-0000-0000-0000-0000-0000-0000
0001-1111-1111-1111-1111-1111-1111-1111

经地方的注解以及测试用例可以发现,源码的撰稿人巧妙的动一个价值代表了2种意思(前3bit位是状态,后29bit是工作数),下面我们来看看线程池最要害之5种状态:

  1. RUNNING:能接受新交付的职责,并且为会处理阻塞队列中之任务;
  2. SHUTDOWN:关闭状态,不再接受新交付的职责,但可可以延续处理阻塞队列中早已封存之任务。在线程池处于
    RUNNING 状态时,调用
    shutdown()方法会要线程池进入及拖欠状态。(finalize()
    方法在实行过程被呢会调用shutdown()方法上该状态);
  3. STOP:不克承受新职责,也非处理队列中的任务,会半途而废正在处理任务之线程。在线程池处于
    RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow()
    方法会使线程池进入到该状态;
  4. TIDYING:如果具有的任务都早就住了,workerCount (有效线程数)
    为0,线程池进入该状态后会调用 terminated() 方法上TERMINATED 状态。
  5. TERMINATED:在terminated()
    方法执行完后登该状态,默认terminated()方法吃什么啊没做。

生图为线程池的状态转换过程:

图片 3

线程池状态转换图

AtomicInteger ctl

ctl举凡主要的操纵状态,是一个复合类型的变量,其中包括了一定量只概念。

我们来分析一下同ctl至于的有的源代码吧,直接上代码

     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

     //用来表示线程池数量的位数,很明显是29,Integer.SIZE=32
     private static final int COUNT_BITS = Integer.SIZE - 3;
     //线程池最大数量,2^29 - 1
     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

     // runState is stored in the high-order bits
     //我们可以看出有5种runState状态,证明至少需要3位来表示runState状态
     //所以高三位就是表示runState了
     private static final int RUNNING    = -1 << COUNT_BITS;
     private static final int SHUTDOWN   =  0 << COUNT_BITS;
     private static final int STOP       =  1 << COUNT_BITS;
     private static final int TIDYING    =  2 << COUNT_BITS;
     private static final int TERMINATED =  3 << COUNT_BITS;

     // Packing and unpacking ctl
     private static int runStateOf(int c)     { return c & ~CAPACITY; }
     private static int workerCountOf(int c)  { return c & CAPACITY; }
     private static int ctlOf(int rs, int wc) { return rs | wc; }

     //用于存放线程任务的阻塞队列
     private final BlockingQueue<Runnable> workQueue;

     //重入锁
     private final ReentrantLock mainLock = new ReentrantLock();

     //线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问
     private final HashSet<Worker> workers = new HashSet<Worker>();

     //等待条件支持终止
     private final Condition termination = mainLock.newCondition();

     //创建新线程的线程工厂
     private volatile ThreadFactory threadFactory;

     //饱和策略
     private volatile RejectedExecutionHandler handler;
  1. CAPACITY

    在这里我们提一下是线程池最特别数据之计量吧,因为此地涉及到源码以及移动之类的操作,我感觉到大多数丁犹还是未极端会以此,因为自同开始看的时呢是免绝会之。

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

由代码我们好看看,是得1往左移29位,然后再度减去1,那个1往左移29位凡怎么算的啊?

     1 << COUNT_BITS
      ​
      1的32位2进制是
      00000000 00000000 00000000 00000001
      ​
      左移29位的话就是
      00100000 00000000 00000000 00000000
      ​
      再进行减一的操作
      000 11111 11111111 11111111 11111111
      ​
      也就是说线程池最大数目就是
      000 11111 11111111 11111111 11111111

2.runState

恰好数的原码、反码、补码都是一律的
每当计算机底层,是故补码来代表的

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING    = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

-1 << COUNT_BITS

这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来
          ​
-1的原码
10000000 00000000 00000000 00000001
          ​
-1的反码,负数的反码是将原码除符号位以外全部取反
11111111 11111111 11111111 11111110
          ​
-1的补码,负数的补码就是将反码+1
11111111 11111111 11111111 11111111
          ​
关键了,往左移29位,所以高3位全是1就是RUNNING状态
111 00000 00000000 00000000 00000000

0 << COUNT_BITS
          ​
0的表示
00000000 00000000 00000000 00000000
          ​
往左移29位
00000000 00000000 00000000 00000000

1 << COUNT_BITS
          ​
1的表示
00000000 00000000 00000000 00000001
          ​
往左移29位
00100000 00000000 00000000 00000000

2 << COUNT_BITS
          ​
2的32位2进制
00000000 00000000 00000000 00000010
          ​
往左移29位
01000000 00000000 00000000 00000000

3 << COUNT_BITS
          ​
3的32位2进制
00000000 00000000 00000000 00000011
          ​
往左移29位
11000000 00000000 00000000 00000000

3.有的主意介绍

实时落runState的点子

private static int runStateOf(int c)     { return c & ~CAPACITY; }

~CAPACITY
~是按位取反的意思
&是按位与的意思
          ​
而CAPACITY是,高位3个0,低29位都是1,所以是
000 11111 11111111 11111111 11111111
          ​
取反的话就是
111 00000 00000000 00000000 00000000
          ​
传进来的c参数与取反的CAPACITY进行按位与操作
1、低位29个0进行按位与,还是29个0
2、高位3个1,既保持c参数的高3位
既高位保持原样,低29位都是0,这也就获得了线程池的运行状态runState

获取线程池的眼前有效线程数目

private static int workerCountOf(int c)  { return c & CAPACITY; }

CAPACITY的32位2进制是
000 11111 11111111 11111111 11111111
          ​
用入参c跟CAPACITY进行按位与操作
1、低29位都是1,所以保留c的低29位,也就是有效线程数
2、高3位都是0,所以c的高3位也是0
          ​
这样获取出来的便是workerCount的值

//结合这几句代码来看
private static final int RUNNING    = -1 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
          ​
private static int ctlOf(int rs, int wc) { return rs | wc; }

RUNNING是
111 00000 00000000 00000000 00000000
          ​
ctlOf是将rs和wc进行按位或的操作
          ​
初始化的时候是将RUNNING和0进行按位或
0的32位2进制是
00000000 00000000 00000000 00000000
          ​
所以初始化的ctl是
111 00000 00000000 00000000 00000000

3.2、构造方法

/**
 * @param corePoolSize 保留在线程池中的线程数,即使它们处于空闲状态,除非设置了{@code allowCoreThreadTimeOut}
 * @param maximumPoolSize 线程池中允许的最大线程数
 * @param keepAliveTime 当线程数大于corePoolSize时,这是多余空闲线程在终止之前等待新任务的最大时间。
 * @param unit {@code keepAliveTime}参数的时间单位
 * @param workQueue 在执行任务之前用于保存任务的队列。 这个队列将只保存{@code execute}方法提交的{@code Runnable}任务。
 * @param threadFactory 用来执行的时候创建线程的线程工厂
 * @param handler 在执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

对此参数handler:线程池提供了4栽政策:

  1. AbortPolicy:直接丢掉来怪,这是默认策略;
  2. CallerRunsPolicy:用调用者所当的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中凭借极前方的任务,并履行当前任务;
  4. DiscardPolicy:直接丢弃任务;

核心措施源码分析

  1. execute(Runnable command)方法

      public void execute(Runnable command) {
          //需要执行的任务command为空,抛出空指针异常
          if (command == null)  // 1
              throw new NullPointerException();

          /*
          *执行的流程实际上分为三步
          *1、如果运行的线程小于corePoolSize,以用户给定的Runable对象新开一个线程去执行
          *  并且执行addWorker方法会以原子性操作去检查runState和workerCount,以防止当返回false的
          *  时候添加了不应该添加的线程
          *2、 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在前
          *  一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。所以我们需要复查状态,并有有必
          *  要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程
          *3、如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了
          *  或者饱和了,就需要拒绝这个任务
          *
          */

          //获取线程池的控制状态
          int c = ctl.get();  // 2

          //通过workCountOf方法算workerCount值,小于corePoolSize
          if (workerCountOf(c) < corePoolSize) {
              //添加任务到worker集合当中
      if (addWorker(command, true)) 
                  return;  //成功返回
              //失败的话再次获取线程池的控制状态
              c = ctl.get();
          }

          /*
          *判断线程池是否正处于RUNNING状态
          *是的话添加Runnable对象到workQueue队列当中
          */
          if (isRunning(c) && workQueue.offer(command)) {  // 3

              //再次获取线程池的状态
              int recheck = ctl.get();

              //再次检查状态
              //线程池不处于RUNNING状态,将任务从workQueue队列中移除
              if (! isRunning(recheck) && remove(command))
                  //拒绝任务
                  reject(command);
              //workerCount等于0
              else if (workerCountOf(recheck) == 0)  // 4
                  //添加worker
                  addWorker(null, false);
          }
          //加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务
      else if (!addWorker(command, false))  // 5 
              //执行失败则拒绝任务
              reject(command);
      }

咱们的话一下地方这个代码的流水线:

1、首先判断任务是否为空,空则抛来空指针异常
2、不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中实行,

  • 如成功,则返回
  • 破产的言辞又接着获取线程池控制状态,因为只有状态变了才见面失败,所以再次赢得
    3、判断线程池是否处在运行状态,是的口舌则添加command到死队列,加入时也会更得到状态又检测

    状态是不是非处在运行状态,不处于的讲话虽然用command从绿灯队列移除,并且拒绝任务
    4、如果线程池里没有了线程,则开创新的线程去执行得阻塞队列的任务尽
    5、如果上述都未曾尽成功,则需要开启最老线程池里之线程来推行任务,失败的语句虽撇下

偶然又多之文字也不如一个流水线图来的懂得,所以还是画画了单execute的流程图给大家有利了解。
图片 4

2.addWorker(Runnable firstTask, boolean core)

      private boolean addWorker(Runnable firstTask, boolean core) {
          //外部循环标记
          retry:
          //外层死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();
              //获取runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.

              /**
              *1.如果线程池runState至少已经是SHUTDOWN
              *2\. 有一个是false则addWorker失败,看false的情况
              * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了
              * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是
              *  firstTask不为空,代表线程池已经关闭了还在传任务进来
              * - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了
              */
              if (rs >= SHUTDOWN &&  //runState大于等于SHUTDOWN,初始位RUNNING
                  ! (rs == SHUTDOWN &&  //runState等于SHUTDOWN
                     firstTask == null &&  //firstTask为null
                     ! workQueue.isEmpty()))  //workQueue队列不为空
                  return false;
      ​
              //内层死循环
              for (;;) {
                  //获取线程池的workerCount数量
                  int wc = workerCountOf(c);
                  //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
                  //返回false
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记
                  if (compareAndIncrementWorkerCount(c))
                      break retry;

                  //CAS操作失败,再次获取线程池的控制状态
                  c = ctl.get();  // Re-read ctl
                  //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
                  //CAS由于更改workerCount而失败,继续内层循环
              }
          }
      ​
          //通过以上循环,能执行到这是workerCount成功+1了

          //worker开始标记
          boolean workerStarted = false;
          //worker添加标记
          boolean workerAdded = false;
          //初始化worker为null
          Worker w = null;
          try {
              //初始化一个当前Runnable对象的worker对象
              w = new Worker(firstTask);
              //获取该worker对应的线程
              final Thread t = w.thread;
              //如果线程不为null
              if (t != null) {
                  //初始线程池的锁
                  final ReentrantLock mainLock = this.mainLock;
                  //获取锁
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      //获取锁后再次检查,获取线程池runState
                      int rs = runStateOf(ctl.get());
      ​
                      //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {

                          //线程已存活
                          if (t.isAlive()) // precheck that t is startable
                              //线程未启动就存活,抛出IllegalThreadStateException异常
                              throw new IllegalThreadStateException();

                          //将worker对象添加到workers集合当中
                          workers.add(w);
                          //获取workers集合的大小
                          int s = workers.size();
                          //如果大小超过largestPoolSize
                          if (s > largestPoolSize)
                              //重新设置largestPoolSize
                              largestPoolSize = s;
                          //标记worker已经被添加
                          workerAdded = true;
                      }
                  } finally {
                      //释放锁
                      mainLock.unlock();
                  }
                  //如果worker添加成功
                  if (workerAdded) {
                      //启动线程
                      t.start();
                      //标记worker已经启动
                      workerStarted = true;
                  }
              }
          } finally {
              //如果worker没有启动成功
              if (! workerStarted)
                  //workerCount-1的操作
                  addWorkerFailed(w);
          }
          //返回worker是否启动的标记
          return workerStarted;
      }

咱俩也大概说一下夫代码的流程吧,还当真是殊麻烦之,博主写的上都止了好多差,想砸键盘的说:

1、获取线程池的决定状态,进行判定,不相符则赶回false,符合则生同样步
2、死循环,判断workerCount是否高于上限,或者超越corePoolSize/maximumPoolSize,没有的言辞虽然针对workerCount+1操作,
3、如果非切合上述判断或+1操作失败,再次获取线程池的支配状态,获取runState与正开取之runState相比,不相同则跳出内层循环继续外层循环,否则继续内层循环
4、+1操作成后,使用重入锁ReentrantLock来保证为workers当中加加worker实例,添加成功便启动该实例。

通下看看流程图来解一下方面代码的一个实行流程
图片 5

3.addWorkerFailed(Worker w)

addWorker方法添加worker失败,并且没有水到渠成启动任务之当儿,就见面调用此方法,将任务由workers中移除,并且workerCount做-1操作。

      private void addWorkerFailed(Worker w) {
          //重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //如果worker不为null
              if (w != null)
                  //workers移除worker
                  workers.remove(w);
              //通过CAS操作,workerCount-1
              decrementWorkerCount();
              tryTerminate();
          } finally {
              //释放锁
              mainLock.unlock();
          }
      }

4.tryTerminate()

当对线程池执行了怪成功逻辑的操作时,都见面待实践tryTerminate尝试终止线程池

      final void tryTerminate() {
          //死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();

              /*
              *线程池处于RUNNING状态
              *线程池状态最小大于TIDYING
              *线程池==SHUTDOWN并且workQUeue不为空
              *直接return,不能终止
              */
              if (isRunning(c) ||
                  runStateAtLeast(c, TIDYING) ||
                  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;

              //如果workerCount不为0
              if (workerCountOf(c) != 0) { // Eligible to terminate
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
      ​
              //获取线程池的锁
              final ReentrantLock mainLock = this.mainLock;
              //获取锁
              mainLock.lock();
              try {
                  //通过CAS操作,设置线程池状态为TIDYING
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          //设置线程池的状态为TERMINATED
                          ctl.set(ctlOf(TERMINATED, 0));
                          //发送释放信号给在termination条件上等待的线程
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  //释放锁
                  mainLock.unlock();
              }
              // else retry on failed CAS
          }
      }

5.runWorker(Worker w)

欠方法的意就是错开实践任务

final void runWorker(Worker w) {
      //获取当前线程
      Thread wt = Thread.currentThread();
      //获取worker里的任务
      Runnable task = w.firstTask;
      //将worker实例的任务赋值为null
      w.firstTask = null;

      /*
      *unlock方法会调用AQS的release方法
      *release方法会调用具体实现类也就是Worker的tryRelease方法
      *也就是将AQS状态置为0,允许中断
      */
      w.unlock(); // allow interrupts
      //是否突然完成
      boolean completedAbruptly = true;
      try {
          //worker实例的task不为空,或者通过getTask获取的不为空
          while (task != null || (task = getTask()) != null) {
              //获取锁
              w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted.  This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
              /*
              *获取线程池的控制状态,至少要大于STOP状态
              *如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP
              *如果上述满足,检查该对象是否处于中断状态,不清除中断标记
              */
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  //中断改对象
                  wt.interrupt();
              try {
                  //执行前的方法,由子类具体实现
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      //执行任务
                      task.run();
                  } catch (RuntimeException x) {
                      thrown = x; throw x;
                  } catch (Error x) {
                      thrown = x; throw x;
                  } catch (Throwable x) {
                      thrown = x; throw new Error(x);
                  } finally {
                      //执行完后调用的方法,也是由子类具体实现
                      afterExecute(task, thrown);
                  }
              } finally {//执行完后
                  //task设置为null
                  task = null;
                  //已完成任务数+1
                  w.completedTasks++;
                  //释放锁
                  w.unlock();
              }
          }
          completedAbruptly = false;
      } finally {
          //处理并退出当前worker
          processWorkerExit(w, completedAbruptly);
      }
  }

通下我们用文字来验证一下实施任务是法子的切实逻辑与流程。

  1. 率先在道同样进来,就实施了w.unlock(),这是为了将AQS的状态改吗0,因为只有getState() >=
    0的时刻,线程才可于搁浅;
  2. 判断firstTask是否为空,为空则通过getTask()获取任务,不呢空接着往下执行
  3. 看清是否相符中断状态,符合的讲话设置中断标记
  4. 执行beforeExecute(),task.run(),afterExecute()方法
  5. 任何一个生出非常都见面招致任务履行之停止;进入processWorkerExit来退出任务
  6. 好端端履之话会接着回到步骤2

依附一称简单的流程图:
图片 6

6.getTask()

于方的runWorker方法当中我们好看看,当firstTask为空的下,会由此该方式来就获取任务去履行,那我们就算省获取任务是法子到底是怎么样的?

      private Runnable getTask() {
          //标志是否获取任务超时
          boolean timedOut = false; // Did the last poll() time out?
      ​
          //死循环
          for (;;) {
              //获取线程池的控制状态
              int c = ctl.get();
              //获取线程池的runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.
              /*
              *判断线程池的状态,出现以下两种情况
              *1、runState大于等于SHUTDOWN状态
              *2、runState大于等于STOP或者阻塞队列为空
              *将会通过CAS操作,进行workerCount-1并返回null
              */
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
      ​
              //获取线程池的workerCount
              int wc = workerCountOf(c);
      ​
              // Are workers subject to culling?

              /*
              *allowCoreThreadTimeOut:是否允许core Thread超时,默认false
              *workerCount是否大于核心核心线程池
              */
              boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      ​
              /*
              *1、wc大于maximumPoolSize或者已超时
              *2、队列不为空时保证至少有一个任务
              */
              if ((wc > maximumPoolSize || (timed && timedOut))
                  && (wc > 1 || workQueue.isEmpty())) {
                  /*
                  *通过CAS操作,workerCount-1
                  *能进行-1操作,证明wc大于maximumPoolSize或者已经超时
                  */
                  if (compareAndDecrementWorkerCount(c))
                      //-1操作成功,返回null
                      return null;
                  //-1操作失败,继续循环
                  continue;
              }
      ​
              try {
                  /*
                  *wc大于核心线程池
                  *执行poll方法
                  *小于核心线程池
                  *执行take方法
                  */
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
                  //判断任务不为空返回任务
                  if (r != null)
                      return r;
                  //获取一段时间没有获取到,获取超时
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;
              }
          }
      }

还是文字说明一下方的代码逻辑和流程:

  1. 获取线程池控制状态和runState,判断线程池是否都关或在关闭,是的语则workerCount-1操作返回null
  2. 抱workerCount判断是否过核心线程池
  3. 判断workerCount是否超过最特别线程池数目或曾经晚点,是的话workerCount-1,-1成则赶回null,不成事则归步骤1再度继续
  4. 判定workerCount是否高于核心线程池,大于则据此poll方法从队列获取任务,否则用take方法从队列获取任务
  5. 判定任务是否也空,不为空则返回获取之任务,否则回步骤1更继续

通下去还是时有发生同等合乎流程图:
图片 7

7.processWorkerExit

众目睽睽的,在实践任务中,会失掉得任务展开实施,那既然是实践任务,肯定就会见产生执行完毕要出现异常中断执行之早晚,那立上势必为会见生出相对应的操作,至于具体操作是什么样的,我们或直接去看源码最实际。

     private void processWorkerExit(Worker w, boolean completedAbruptly) {
          /*
          *completedAbruptly:在runWorker出现,代表是否突然完成的意思
          *也就是在执行任务过程当中出现异常,就会突然完成,传true
          *
          *如果是突然完成,需要通过CAS操作,workerCount-1
          *不是突然完成,则不需要-1,因为getTask方法当中已经-1
          *
          *下面的代码注释貌似与代码意思相反了
          */
          if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
              decrementWorkerCount();
      ​
          //生成重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数
              completedTaskCount += w.completedTasks;
              //从HashSet<Worker>中移除
              workers.remove(w);
          } finally {
              //释放锁
              mainLock.unlock();
          }
      ​
          //因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池
          tryTerminate();
      ​
          //获取线程池的控制状态
          int c = ctl.get();

          //判断runState是否小鱼STOP,即是RUNNING或者SHUTDOWN
          //如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池
          if (runStateLessThan(c, STOP)) {
              /*
              *是否突然完成
              *如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环
              */
              if (!completedAbruptly) {
                  /*
                  *allowCoreThreadTimeOut:是否允许core thread超时,默认false
                  *min-默认是corePoolSize
                  */
                  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                  //允许core thread超时并且队列不为空
                  //min为0,即允许core thread超时,这样就不需要维护核心核心线程池了
                  //如果workQueue不为空,则至少保持一个线程存活
                  if (min == 0 && ! workQueue.isEmpty())
                      min = 1;
                  //如果workerCount大于min,则表示满足所需,可以直接返回
                  if (workerCountOf(c) >= min)
                      return; // replacement not needed
              }
              //如果是突然完成,添加一个空任务的worker线程--这里我也不太理解
              addWorker(null, false);
          }
      }
  1. 第一判断线程是否突然停,如果是黑马停下,通过CAS,workerCount-1
  2. 统计线程池就任务数,并将worker从workers当中移除
  3. 认清线程池状态,尝试终止线程池
  4. 线程池没有中标平息
    • 判定是否突然好任务,不是虽然开展下一致步,是则展开第三步
    • 如若许核心线程超时,队列不也空,则至少力保一个线程存活
    • 加上一个拖欠任务的worker线程

3.3、核心措施

execute方法

线程池最中心之计莫过于execute了,execute()方法用来付任务,下面我们沿这点子看看那个实现原理:

/**
 * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
 * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 执行分以下3步:
     *
     * 1. 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
     *
     * 2. 如果线程入队成功,然后还是要进行double-check的,因为线程池在入队之后状态是可能会发生变化的
     *
     * 3. 如果无法将任务入队列(可能队列满了),需要新开一个线程
     * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务。
     */
    int c = ctl.get();

    /**
     * 1、如果当前线程数少于corePoolSize,开启一个线程执行命令
     *(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;

        /**
         * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
         * 失败的原因可能是:
         * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
         * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
         */
        c = ctl.get();
    }

    /**
     * 2、如果线程池RUNNING状态,且入队列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();//再次校验位

        //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 新建一个worker线程,没有指定firstTask,因为命令已经放入queue里了
            addWorker(null, false);
    }
    /**
     * 3、如果线程池不是running状态 或者 无法入队列
     *   尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
     */
    else if (!addWorker(command, false))
        reject(command);
}

每当执行execute()方法时一旦状态一直是RUNNING时,的实行进程如下:

  1. 若workerCount <
    corePoolSize,则开创并启动一个线程来施行新交付的天职;
  2. 设workerCount >=
    corePoolSize,且线程池内的堵截队列未满,则将任务添加到该阻塞队列中;
  3. 一旦workerCount >= corePoolSize && workerCount <
    maximumPoolSize,且线程池内之不通队列已满,则开创并启动一个线程来施行新交付的任务;
  4. 假若workerCount >= maximumPoolSize,并且线程池内的短路队列已满,
    则基于拒绝策略来拍卖该任务, 默认的处理方式是一直抛大。

addWorker方法

addWorker方法的根本办事是在线程池中创造一个新的线程并履行,firstTask参数
用于指定新增的线程执行之第一独任务。core为true表示在新增线程时会判断时活动线程数是否少corePoolSize,false表示新增线程前要判定当前活动线程数是否少maximumPoolSize,代码如下:

/**
 * 检查是否可以针对当前的池状态和给定的界限(核心或最大值)添加新的工作者。相应地调整工人数量,并且如果可能的话,创建并开始新的工作者,运行firstTask作为其第一个任务。
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 只有当下面两种情况会继续执行,其他直接返回false(添加失败)
         * 1、rs == RUNNING
         * 2、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() (执行了shutdown方法,但是阻塞队列还有任务没有执行)
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 当wc超过最大限制 || 如果是核心线程,超过了核心数,否则超过了最大线程数,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // count累加成功,直接跳出两层for循环,执行下面的逻辑
                break retry;

            /**
             * 能执行到这里,都是因为多线程竞争,只有两种情况
             * 1、workCount发生变化,compareAndIncrementWorkerCount失败,这种情况不需要重新获取ctl,继续for循环即可。
             * 2、runState发生变化,可能执行了shutdown或者shutdownNow,这种情况重新走retry,取得最新的ctl并判断状态。
             */
            c = ctl.get();  // 重新读取ctl,可能状态发生了变化
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取锁后重新检测runState,因为有可能shutdown了
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        // 线程不能是活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;    //记录最大线程数
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}

留意一下此的t.start()这个话,启动时见面调用Worker类中之run方法,Worker本身实现了Runnable接口,所以一个Worker类型的靶子也是一个线程。

Worker类

线程池中之各一个线程被封装成一个Worker对象,ThreadPool维护的实际上就算是同一组Worker对象,看一下Worker的定义:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 还没有执行任务时,这时就不应该被中断,设置状态为-1
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        // 调用runWorker方法执行
        runWorker(this);
    }

    // Lock methods
    //
    // 0代表没有锁定状态
    // 1代表锁定状态

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker继承了AQS,使用AQS来落实独占锁的功力。为什么未行使ReentrantLock来促成吗?可以看出tryAcquire方法,它是无容许重入的,而ReentrantLock是容重入的:

  1. lock方法要取得了独占锁,表示目前线程正在实践任务中;
  2. 若是正在尽任务,则非应当中断线程;
  3. 万一该线程现在非是独占锁的状态,也即是闲之状态,说明它们并未在拍卖任务,这时可以本着拖欠线程进行中断;
  4. 线程池在尽shutdown方法要tryTerminate方法时见面调用interruptIdleWorkers方法来刹车空闲之线程,interruptIdleWorkers方法会采用tryLock方法来判断线程池中之线程是否是悠闲状态;
  5. 所以设置也不可重入,是以咱们不希望任务在调用像setCorePoolSize这样的线程池控制方法时又取得锁。如果运用ReentrantLock,它是可重入的,这样要以职责中调用了如setCorePoolSize这好像线程池控制的不二法门,会暂停正在运作的线程。

于是,Worker继承自AQS,用于判断线程是否空闲以及是否足以给中止。

runWorker方法

以Worker类中的run方法调用了runWorker方法来实施任务,runWorker方法的代码如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 如果task为空,则通过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();   // 开始运行,不允许中断

            /**
             * 确保只有在线程STOP时,才会被设置中断标示,否则清除中断标示
             * 1、如果线程池状态>=STOP,且当前线程没有设置中断状态,wt.interrupt()
             * 2、如果一开始判断线程池状态<STOP,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=STOP
             *  是,再次设置中断标示,wt.interrupt()
             *  否,不做操作,清除中断标示后进行后续步骤
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 用户自己实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 用户自己实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // worker已经完成的任务数 + 1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

/**
 * getTask方法用来从阻塞队列中取任务
 * 以下情况会返回null(被回收)
 * 1、超过了maximumPoolSize设置的线程数量(因为调用了setMaximumPoolSize())
 * 2、线程池被stop
 * 3、线程池被shutdown,并且workQueue空了
 * 4、线程等待任务超时
 * 返回null表示这个worker要结束了,这种情况下workerCount-1
 */
private Runnable getTask() {
    boolean timedOut = false; // 上一次poll()是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 满足以下几点,wc - 1,返回null
         * 1、rs >= STOP
         * 2、rs == SHUTDOWN && workQueue.isEmpty()
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 队列获取值是否要阻塞等待
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /**
         * 满足以下几点,wc - 1,返回null
         * 1、wc > maximumPoolSize
         * 2、1 < wc <= maximumPoolSize && timed && timedOut
         * 3、wc <= 1 && workQueue.isEmpty() && timed && timedOut
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 获取Runnable
            Runnable r = timed ?
                // 超时会被回收
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 阻塞等待,默认设置最后最多会有corePoolSize的线程一起阻塞。
                // 如果设置allowCoreThreadTimeOut=true的话,最后所有线程都会被回收。
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

/**
 * @param completedAbruptly true:worker执行的时候异常了
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1、worker数量-1
     * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
     * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
     */
    if (completedAbruptly)
        decrementWorkerCount();

    /**
     * 2、从Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    /**
     * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
     * 主要是判断线程池是否满足终止的状态
     * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
     * 没有线程了,更新状态为tidying->terminated
     */
    tryTerminate();

    /**
     * 4、是否需要增加worker线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    /**
     * 以下情况会增加一个worker addWorker(null, false);
     * 1、completedAbruptly == true
     * 2、completedAbruptly == false && allowCoreThreadTimeOut == true && wc < 1
     * 3、completedAbruptly == false && allowCoreThreadTimeOut == false && wc < corePoolSize
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

getTask重要之地方是亚独if判断,目的是控制线程池的中线程数量。由臻文中的分析可以清楚,在执行execute方法时,如果手上线程池的线程数量超越了corePoolSize且低于maximumPoolSize,并且workQueue已满时,则可以多工作线程,但这要超时没有得到到任务,也尽管是timedOut为true的状况,说明workQueue已经也空了,也即证实了当下线程池中无需那么基本上线程来施行任务了,可以管多于corePoolSize数量之线程销毁掉,保持线程数量在corePoolSize即可。

processWorkerExit执行了事后,工作线程被销毁,以上就是是总体工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的劳作线程,runWorker通过getTask获取任务,然后实施任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束

下面是从execute到线程销毁的布满流程图:

图片 8

execute执行图

Worker内部类


我们以面已经算是非常详细地说了线程池执行任务execute的实践流程和部分细节,在方往往地涌出了一个字眼,那就是worker实例,那么是worker究竟是什么吗?里面还蕴涵了有些啊信息,以及worker这个职责究竟是怎行的啊?

​ 我们即便在是片来介绍一下吧,还是直接上源码:

俺们好看看Worker内部类继承AQS同步器并且实现了Runnable接口,所以Worker很明确就是是一个而尽任务而还要有何不可操纵中断、起至锁效果的切近。

  private final class Worker
          extends AbstractQueuedSynchronizer
          implements Runnable
      {
          /**
           * This class will never be serialized, but we provide a
           * serialVersionUID to suppress a javac warning.
           */
          private static final long serialVersionUID = 6138294804551838833L;
  ​
          /** 工作线程,如果工厂失败则为空. */
          final Thread thread;
          /** 初始化任务,有可能为空 */
          Runnable firstTask;
          /** 已完成的任务计数 */
          volatile long completedTasks;
  ​
          /**
           * 创建并初始化第一个任务,使用线程工厂来创建线程
           * 初始化有3步
           *1、设置AQS的同步状态为-1,表示该对象需要被唤醒
           *2、初始化第一个任务
           *3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
           */
          Worker(Runnable firstTask) {
              setState(-1); // inhibit interrupts until runWorker
              this.firstTask = firstTask;
              this.thread = getThreadFactory().newThread(this);
          }
  ​
    //重写Runnable的run方法
          /** Delegates main run loop to outer runWorker  */
          public void run() {
              //调用ThreadPoolExecutor的runWorker方法
              runWorker(this);
          }
  ​
          // Lock methods
          //
          // The value 0 represents the unlocked state.
          // The value 1 represents the locked state.
    //代表是否独占锁,0-非独占  1-独占
          protected boolean isHeldExclusively() {
              return getState() != 0;
          }

    //重写AQS的tryAcquire方法尝试获取锁
          protected boolean tryAcquire(int unused) {
           //尝试将AQS的同步状态从0改为1
              if (compareAndSetState(0, 1)) {
               //如果改变成,则将当前独占模式的线程设置为当前线程并返回true
                  setExclusiveOwnerThread(Thread.currentThread());
                  return true;
              }
              //否则返回false
              return false;
          }
  ​
    //重写AQS的tryRelease尝试释放锁
          protected boolean tryRelease(int unused) {
           //设置当前独占模式的线程为null
              setExclusiveOwnerThread(null);
              //设置AQS同步状态为0
              setState(0);
              //返回true
              return true;
          }
  ​
    //获取锁
          public void lock()        { acquire(1); }
          //尝试获取锁
          public boolean tryLock()  { return tryAcquire(1); }
          //释放锁
          public void unlock()      { release(1); }
          //是否被独占
          public boolean isLocked() { return isHeldExclusively(); }
  ​
          void interruptIfStarted() {
              Thread t;
              if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                  try {
                      t.interrupt();
                  } catch (SecurityException ignore) {
                  }
              }
          }
  }

3.4、其他外部调用方法

脚的方法都是用户可以友善进行调用的:

/**
 * 状态改为SHUTDOWN
 * 启动先前提交的任务被执行的有序关闭,但不接受新的任务。 如果已经关闭,则调用没有其他影响。
 * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

/**
 * 状态改为SHUTDOWN
 * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回正在等待执行的任务的列表。 
 * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
 * 从这个方法返回后,这些任务从任务队列中排出(移除)。 除了竭尽全力地停止处理主动执行任务之外,没有任何保证。 
 * 这个实现通过{@link Thread#interrupt}来取消任务,所以任何不能响应中断的任务都不会终止。
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

// 执行任务前的hook
protected void beforeExecute(Thread t, Runnable r) { }

// 执行任务后的hook
protected void afterExecute(Runnable r, Throwable t) { }

/**
 * 什么都不做,交给子类实现,注意实现的时候使用super.terminated();
 */
protected void terminated() { }

/**
 * 判断状态 >= SHUTDOWN
 */
public boolean isShutdown() {
    return ! isRunning(ctl.get());
}

/**
 * 判断 SHUTDOWN <= 状态 < TERMINATED
 */
public boolean isTerminating() {
    int c = ctl.get();
    return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}

/**
 * 判断状态 == TERMINATED
 */
public boolean isTerminated() {
    return runStateAtLeast(ctl.get(), TERMINATED);
}

/**
 * 在指定的超时时间范围内等待状态变为TERMINATED
 */
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    /**
     * 1、当前workCount > 传入的corePoolSize,中断空闲worker
     * 2、传入的corePoolSize比之前的要大,选出差值和queue的大小做比较,比较小的作为要增加的线程数,调用addWorker,如果中途遇到workQueue为空,就不增加了。
     */
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

/**
 * 提前准备一个core的线程
 */
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}

/**
 * 提前准备所有的core线程
 */
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

// 设置coreThreadTimeOut的值
public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}

// 设置maximumPoolSize
public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

// 从队列里面移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

/**
 * 清除队列里所有呗cancel的Future类型的任务,此方法可用作存储回收操作
 * 该方法可能存在其他线程的干扰,导致清除失败
 */
public void purge() {
    final BlockingQueue<Runnable> q = workQueue;
    try {
        Iterator<Runnable> it = q.iterator();
        while (it.hasNext()) {
            Runnable r = it.next();
            if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                it.remove();
        }
    } catch (ConcurrentModificationException fallThrough) {
        // 如果在遍历期间遇到干扰,请采取慢速路径。进行遍历复制并调用remove取消条目。慢路径更可能是O(N * N)。
        for (Object r : q.toArray())
            if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                q.remove(r);
    }

    tryTerminate(); // In case SHUTDOWN and now empty
}

/**
 * 返回线程池大小
 */
public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // runState == TIDYING 或者 runState == TERMINATED 返回0
        // 否则返回workers的大小
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 获取活跃线程数:根据isLocked来判断是不是在执行任务
 */
public int getActiveCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        int n = 0;
        for (Worker w : workers)
            if (w.isLocked())
                ++n;
        return n;
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回最大线程池的大小
 */
public int getLargestPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        return largestPoolSize;
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回任务总数(包括已经完成的和未完成的)
 */
public long getTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers) {
            n += w.completedTasks;
            if (w.isLocked())
                ++n;
        }
        return n + workQueue.size();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回已完成任务总数
 */
public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

小结

形容这个线程池就真的是未易于了,历时半只星期日,中途有成百上千底地方不亮堂,而且《Java并作编程的不二法门》的立即仍开当中对线程池的牵线其实并无到底多,所以自己拘留起也殊痛苦之,还不时会面扣押了此法就非知道干什么而调用这个与调用这个艺术是出何用意。而且在即时上之进程中,有在怀疑自己之读书道对怪,因为也有人跟自身说不待同句词去看去分析源码,只需要掌握流程虽足以了,但是后来还是考虑按照好的学路线走,多读源码总是有益处的,在此地我呢被程序猿一些建议,有投机的攻道的上,按照自己之办法坚定走下去。

3.5、内部方法与空方法

脚的章程都是用户自己调用不了之方,这里呢召开一下证:

/**
 * 替换状态
 * 如果现在的ctl状态 >= targetState,什么都不做
 * 如果现在的ctl状态 < targetState,尝试替换状态
 */
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            // 前3位替换,后29位保持ctl原来的数目
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

/**
 * 尝试终止,只有当以下几种情况才把状态改为TERMINATED
 *      1、SHUTDOWN状态 && queue是空的 && wc == 0
 *      2、STOP状态 && wc == 0
 * workCount如果不是0,这时候就中断其中一个idle的worker来传播关闭信号
 * 该方法必须在执行任何可能会终止的操作之后调用此方法 - 在关闭期间减少工作人员数量或从队列中删除任务。
 * ScheduledThreadPoolExecutor里面也用到了,所以这里修饰符没用private
 */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // c是运行时的状态
        if (isRunning(c) ||
            // c的状态值 >= TIDYING
            runStateAtLeast(c, TIDYING) ||
            // c的状态是SHUTDOWN && 队列不是空
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

        // worker数不是0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 设置ctl的状态为TIDYING,为中间过渡状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 终止方法,空方法什么都不做,子类去实现
                    terminated();
                } finally {
                    // 设置ctl的状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

/**
 * 中断worker的空闲线程
 * @param onlyOne 是否仅仅中断worker中的第一个
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 尝试获取锁,这里只有当线程没有运行的时候才能够tryLock成功
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    // 设置worker线程的中断变量
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // true,只中断队列的第一个就退出
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

/**
 * 中断所有worker的线程
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 中断所有worker的空闲线程
 */
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

/**
 * 根据拒绝策略拒绝执行命令
 */
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

/**
 * 移除队列中的Runnable到一个新list中,使用的是阻塞队列的drainTo方法
 * 但是如果队列是DelayQueue或者其他能够让poll或者drainTo失败移除元素的队列的话,遍历队列并删除它
 */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

/**
 * 预留方法,ScheduledThreadPoolExecutor重写了此方法
 */
void onShutdown() {
}

// ScheduledThreadPoolExecutor进行调用,判断是不是running或shutdown状态
final boolean isRunningOrShutdown(boolean shutdownOK) {
    int rs = runStateOf(ctl.get());
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

// ScheduledThreadPoolExecutor进行调用,确认都提前准备好了
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

参考资料

方腾飞:《Java并作编程的计》

如需转载,请务必注明出处,毕竟一块块搬砖也非是易的业务。

4、线程池的监控

通过线程池提供的参数进行督察。线程池里产生有属性在监控线程池的时刻可以采用

  1. getTaskCount:线程池已经推行的及莫履行之任务总数;
  2. getCompletedTaskCount:线程池已成功的天职数,该值小于等于taskCount;
  3. getLargestPoolSize:线程池曾经创造了之卓绝特别线程数量。通过之数量可知道线程池是否满了,也就是是达到了maximumPoolSize;
  4. getPoolSize:线程池当前之线程数量;
  5. getActiveCount:当前线程池中正在行任务之线程数量。

经过这些措施,可以对线程池进行监察,在ThreadPoolExecutor类中提供了几个缺损方法,如beforeExecute方法,afterExecute方法与terminated方法,可以扩展这些点子在履行前还是施行后多部分初的操作,例如统计线程池的尽任务之时空等,可以持续自ThreadPoolExecutor来进行扩展。

5、合理之配置线程池

设想合理之配置线程池,就必首先分析任务特点,可以于以下几个角度来进展分析:

  1. 任务之特性:CPU密集型任务,IO密集型任务及混合型任务。
  2. 职责的先级:高,中与亚。
  3. 职责的履行时间:长,中以及不够。
  4. 职责的指:是否指其他系统资源,如数据库连接。

职责性质不一之天职可据此不同层面的线程池分开处理。CPU密集型任务部署尽可能少的线程数量,如安排Ncpu+1单线程的线程池。IO密集型任务则是因为用等待IO操作,线程并无是直接当尽任务,则安排尽可能多的线程,如2*Ncpu。混合型的天职,如果得以拆分,则用那拆分成一个CPU密集型任务及一个IO密集型任务,只要这简单单任务执行的日子去不是极端特别,那么分解后实施的吞吐率要压倒串行执行之吞吐率,如果立即有限单任务尽时间相差太大,则无必要进行分解。我们可通过Runtime.getRuntime().availableProcessors()方法获得当前设施的CPU个数。

先期级不等的职责可采用优先级列PriorityBlockingQueue来处理。它好为优先级赛的任务先获实施,需要留意的凡如直接闹先级赛之天职交给至队里,那么先级低的任务或永远不能够实施。

执行时各异之任务可以交到不同范畴的线程池来处理,或者也得以应用优先级列,让执行时间短的职责先实施。

因数据库连接池的职责,因为线程提交SQL后需拭目以待数据库返回结果,如果等待的时日越长CPU空闲时间即更为长,那么线程数应该设置更老,这样才能够再好之应用CPU。

建议采用有界队列,有格队列会增加系统的稳定性以及预警能力,可以根据需要设大一点,比如几千。有同一不良我们组采取的后台任务线程池的班和线程池全载了,不断的抛出抛弃任务之慌,通过排查发现凡是数据库出现了问题,导致执行SQL变得生缓慢,因为后台任务线程池里的天职都是要往数据库查询与插数据的,所以导致线程池里的工作线程全部梗住,任务积压在线程池里。如果及时我们装成无界队列,线程池的阵就见面越多,有或会见支撑满内存,导致整体系未可用,而休一味是后台任务出现问题。当然我们的系所有的天职是故底独立的服务器部署的,而我辈采用不同范畴之线程池跑不同品类的天职,但是出现如此问题时常也会影响到其它职责。

自参考了:争合理地估算线程池大小?
这首文章里的用程序评估线程池大小。

6、结论

本文比较详细的辨析了线程池的做事流程,总体来说有如下几个内容:

  1. 分析了线程的缔造,任务之提交,状态的变换和线程池的关;
  2. 此地通过execute方法来展开线程池的办事流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的轻重缓急来判定决定传入的任务应被当即执行,还是当加上到过不去队列中,还是应当拒绝任务。
  3. 介绍了线程池关闭时的历程,也剖析了shutdown方法与getTask方法有竞态条件;
  4. 当获得任务时,要透过线程池的状态来判定该收工作线程还是闭塞线程等待新的任务,也解释了干吗关闭线程池时只要中断工作线程以及为何各一个worker都需lock。

在向线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值,有关Future和Callable请自行了解一下息息相关的文章,这里虽非介绍了。

7、扩展

诚如开发被core线程数量是非常麻烦确定的,可以参考上面提到的怎么合理的估算线程池的大大小小,但是一般都是开发者自己通过压测后取得的数量,之后至确实的线程环境认证,得出一个成立的core数字。假设是5,但是为了防备某些瞬时大流量(我们呢无法预知到底流量会起差不多颇),通常会再次装一个比core线程数要格外的max线程,假设是10。那么当这种瞬时流量真的有了,如果希望服务器会抢的增强处理速度,当然是得让MAX线程尽快启动起来,帮着处理任务。这时候我们不怕好团结扩展线程池。

8、参考

拉并发(三)Java线程池的剖析及行使
深深明Java线程池:ThreadPoolExecutor
Java线程池ThreadPoolExecutor用和分析(二) –
execute()原理

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图