线程池概述
什么是线程池 就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务.
线程池的优势
降低资源消耗,通过重复利用已创建的线程降低线程创建和消耗
提供响应速度,当任务到达时,任务可以不需要等到线程创建就立即执行
提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控.
创建一个线程池并提交线程任务 Java线程池最核心的类是ThreadPoolExecutor,查看ThreadPoolExecutor类关系继承图如下:
查看Executor接口可以通过execute方法进行提交任务 查看ExecutorService接口可以通过submit进行提交任务 所以ThreadPoolExecutor可以使用上述两种方式提交任务
ThreadPoolExecutor源码解析 类的结构
ThreadPoolExecutor的核心内部类为Worker,其对资源进行了复用,减少了创建线程的开销,而其他的AbortPolicy等则是RejectedExecutionHandler接口的各种拒绝策略类 当使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池就会有问题,针对这个问题Java线程池提供了以下拒绝策略:
AbortPolicy:使用该策略时,如果线程池队列满了,丢掉这个任务并且抛出RejectedExecutionException异常
DiscardPolicy: 如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
DiscardOldestPolicy: 如果线程池队列满了,会将最老的(即最早进入队列的)任务删除掉并腾出队列空间,再尝试将任务加入队列
CallerRunsPolicy:如果任务添加到线程池失败,那么主线程会自己去执行该任务,不会去等待线程池的任务去执行
自定义:如果以上策略不符合业务场景,那么可以自己定义拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了 由于核心内部类是worker,而且worker简易,先解析worker:
Worker类源码解析 类继承关系 1 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
可知:Worker类继承了AQS抽象类,实现了Runnable接口,重写了AQS的一些方法,对应的Runnable接口可以创建线程的动作
类属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks;
说明: 1.Thread类型的thread属性用来封装worker,对应形成一个线程 2.Runnable类型的firstTask其表示该worker包含的runnable对象,即用户自定义的Runnable 3.volatile修饰的long类型的completedTasks表示已完成的任务数量
类构造函数 1 2 3 4 5 6 7 8 Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); }
进行构造worker对象,初始化对应的属性
worker核心函数分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); }public boolean tryLock () { return tryAcquire(1 ); }public void unlock () { release(1 ); }public boolean isLocked () { return isHeldExclusively(); }void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
ThreadPoolExecutor类的属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; } private static boolean runStateLessThan (int c, int s) { return c < s; } private static boolean runStateAtLeast (int c, int s) { return c >= s; } private static boolean isRunning (int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect + 1 ); } private boolean compareAndDecrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect - 1 ); } private void decrementWorkerCount () { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread" ); private final AccessControlContext acc;
着重讲解下线程池的运行状态: 1.RUNNING:接受新任务并且处理已经进入阻塞队列的任务 2.SHUTDOWN:不接受新任务,但是处理已经进入阻塞队列的任务 3.STOP:不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行任务 4.TIDYING:所有任务都已经终止,workerCount为0,线程转化为TIDYING状态并且调用terminated钩子函数 5.terminated钩子函数已经运行完成 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; runState单调增加,不一定要命中每个状态: RUNNING -> SHUTDOWN:调用SHUTDOWN()时,可能隐式在最后调用finalize() (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow() SHUTDOWN -> TIDYING:当队列和线程池都为空时 STOP -> TIDYING:当线程池为空时 TIDYING -> TERMINATED:当terminated()钩子方法已经完成
ThreadPoolExecutor类的构造函数 ThreadPoolExecutor类总共有四个构造函数,但是前面三个都是特例最终调的都是最后一个,咱先解析每个构造函数再统一分析好它每一个参数的意思 1.ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
说明:该构造函数默认的线程工厂及拒绝执行策略去创建ThreadPoolExecutor 2.ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory)
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
说明:该构造函数只给出默认的拒绝执行策略 3.ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, RejectedExecutionHandler)
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
说明:该构造函数只给出默认的线程工厂 4.ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory, RejectedExecutionHandler)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
corePoolSize:线程池大小,在创建线程池后,默认情况下线程池中并没有任何线程,而是等到有任务到来后才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,就会预创建线程,即在没有任务到来之前就创建corePoolSize个线程或一个线程.默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
maximumPoolSize:线程池最大线程数,表示线程池中最多创建多少个线程
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止.默认情况下只有当线程池中的线程数大于corePoolSize时,KeepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于CorePoolSize时,如果一个线程空闲的时间达到keepAliveTime则会终止,直到线程池中的线程数不超过corePoolSize.但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
unit: 参数keepAliveTime的时间单位,有7种取值,默认为纳秒 TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue: 一个阻塞队列,用来存储等待执行的任务,一般有以下几种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
threadFactory:线程工厂,主要用来创建线程
handler:拒绝执行策略
ThreadPoolExecutor类的核心函数分析 任务提交过程 1.execute方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
说明:当客户端调用submit时,之后会间接调用execute函数,其在将来某个时间执行给定任务,execute并不会直接运行给定任务,它主要调用addWorker方法 2.addWorker方法 addWorker主要是完成以下任务:
原子性增加workerCount
将用户给定的任务封装成一个worker,并将此worker添加进workers集合
启动worker对应的线程,并启动该线程运行worker的run方法
回滚worker的创建动作,即将worker从workers集合中删除并原子性的减少workerCount
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
任务执行过程 1.runworker方法 runWorker函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成).在执行给定任务时会调用钩子函数利用钩子函数可以完成用户自定义的一些逻辑,在runWorker中会调用getTask函数和processWorkerExit钩子函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
2.getTask方法 getTask函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待poll和无限时间等待take.在该函数中还会相应shutdown和shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnable对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
3.processWorkerExit方法 processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下: 1.阻塞队列已经为空,即没有任务可以运行了 2.调用了shutDown或shutDownNow函数 此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
任务关闭过程 1.shutdown方法 shutdown会按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务.首先检查是否具有shutdown的权限,然后设置线程池的控制为SHUTDOWN,之后中断空闲的worker,最后尝试终止线程池.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
2.tryTerminate方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
3.interruptIdleWorkers方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }