目录
  1. 1. 线程池
    1. 1.1. 线程池定义
    2. 1.2. 为什么要用线程池
    3. 1.3. 线程池的使用
      1. 1.3.1. 创建线程池
      2. 1.3.2. 向线程池提交任务
        1. 1.3.2.1. submit()和execute()的区别
      3. 1.3.3. 线程池关闭
      4. 1.3.4. 合理地配置线程池
      5. 1.3.5. 线程池的监控
    4. 1.4. 线程池原理
      1. 1.4.1. execute()方法示意
      2. 1.4.2. execute()源码分析
    5. 1.5. 自己设计线程池
    6. 1.6. 为什么不允许使用Executors创建线程池
16-线程池

线程池

线程池定义

管理一组工作线程.通过线程池复用线程有以下几点优点:

减少资源消耗 => 通过重复利用已创建的线程降低线程创建和销毁造成的消耗
提高响应速度 => 当任务到达时,任务可以不需要等到线程创建就能立即执行
提高线程的可管理性 => 线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性(无限创建线程引起的OutOfMemoryError),使用线程池可以进行统一分配、调优和监控.但是,要做到合理利用线程池,必须对其实现原理了如指掌

为什么要用线程池

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力
假设一个服务器完成一项任务所需时间为:T1创建线程时间,T2 在线程中执行任务的时间,T3销毁线程时间;如果T1+T3 远大于T2,则可以采用线程池,以提高服务器性能
一个线程池包括以下四个基本组成部分:

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池、销毁线程池、添加新任务
  • 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口、任务执行完后的收尾工作、任务的执行状态等
  • 任务队列(taskQueue):用于存放没有处理的任务.提供一种缓冲机制

线程池技术正是关注如何缩短或调整T1和T3时间的技术,从而提高服务器程序性能的.它把T1、T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1、T3的开销了

线程池不仅调整T1、T3产生的时间段,而且它还显著减少了创建线程的数目

线程池的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果任务数量超过了最大线程数量,需要进入队列排队等候,等其他线程执行完毕,再从队列中取出任务来执行

主要特点:

  • 线程复用
  • 控制最大并发数量
  • 管理线程

带来的好处:

  • 降低了资源消耗.通过复用机制降低了线程创建和销毁的消耗
  • 提高了响应速度.当任务到达时,任务不需要等候就能立即执行
  • 提高了线程的可管理性.线程是稀缺的,如果无限制创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控

线程池的使用

Executor.png

创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 线程池大小不能小于0 || 最大容量不能小于0 || 最大容量不能小于线程池大小 || keepAliveTime不能小于0
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 初始化相应的属性数据
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  1. corePoolSize:线程池基本大小,在创建线程池后,默认情况下线程池中并没有任何线程,而是等到有任务到来后才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,就会预创建线程,即在没有任务到来之前就创建corePoolSize个线程或一个线程.默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
  2. runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列.可以选择以下几个阻塞队列
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue.静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    • SynchronousQueue:一个不存储元素的阻塞队列.每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列
  3. maximumPoolSize:线程池最大线程数,表示线程池中最多创建多少个线程;如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务.值得注意的是,如果使用了无界的任务队列这个参数就没什么效果
  4. threadFactory:线程工厂,主要用来创建线程;可以通过线程工厂给每个创建出来的线程设置更有意义的名字.使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下
    1
    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  5. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务.这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常.在JDK 1.5中Java线程池框架提供了以下4种策略
    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:只用调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
    • DiscardPolicy:不处理,丢弃掉
  6. keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止.默认情况下只有当线程池中的线程数大于corePoolSize时,KeepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于CorePoolSize时,如果一个线程空闲的时间达到keepAliveTime则会终止,直到线程池中的线程数不超过corePoolSize.但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
  7. TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法

  execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功.通过以下代码可知execute()方法输入的任务是一个Runnable类的实例

1
2
3
4
5
6
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
})

  submit()方法用于提交需要返回值的任务.线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完

1
2
3
4
5
6
7
8
9
10
11
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}

submit()和execute()的区别

线程池中的execute方法即开启线程执行池中的任务.还有一个方法submit也可以做到,它的功能是提交指定的任务去执行并且返回Future对象,即执行的结果.下面简要介绍一下两者的三个区别:

  1. 接收参数不一样
  2. submit有返回值,而execute没有
    用到返回值的例子,比如说我有很多个做validation的task,我希望所有的task执行完,然后每个task告诉我它的执行结果,是成功还是失败,如果是失败,原因是什么.然后我就可以把所有失败的原因综合起来发给调用者
  3. submit方便Exception处理
    意思就是如果你在你的task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过捕获Future.get抛出的异常

线程池关闭

  可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池.它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止.但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
  只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true.当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true.至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
  • 任务的优先级:高、中和低
  • 任务的执行时间:长、中和短
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接

  性质不同的任务可以用不同规模的线程池分开处理.CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池.由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu.混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量.如果这两个任务执行时间相差太大,则没必要进行分解.可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数
  优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理.它可以让优先级高的任务先执行

如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行

  执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行
  依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU

  建议使用有界队列有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千.有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线
程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里.如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题.当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务

线程池的监控

  如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题.可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性:

  • taskCount:线程池需要执行的任务数量
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount
  • largestPoolSize:线程池里曾经创建过的最大线程数量.通过这个数据可以知道线程池是否曾经满过.如该数值等于线程池的最大大小,则表示线程池曾经满过
  • getPoolSize:线程池的线程数量.如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减
  • getActiveCount:获取活动的线程数

  通过扩展线程池进行监控.可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控.例如:监控任务的平均执行时间、最大执行时间和最小执行时间等.这几个方法在线程池里是空方法

1
protected void beforeExecute(Thread t, Runnable r) { }

线程池原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?
线程池执行任务逻辑和线程池参数的关系.jpg

假设线程池从最初的状态开始说起:

  1. 线程池创建后,任务交给线程池来运行,线程池判定当前的线程池是否已经达到coreSize大小,如果没有就继续创建线程池(也可以通过prestartCoreThread()来预创建)
  2. 如果CoreSize加到已满,会将加入的任务放入任务队列中,此时核心线程通过take阻塞的形式从任务队列中获取任务执行
  3. 如果任务队列已满,则线程池会开始临时线程通过poll(time,unit)的方式从任务队列中拉活来干,如果临时线程指定的时间获取任务进行执行,则该线程被释放
  4. 如果临时线程一直增大同核心线程之和达到设定的最大线程数,继续放入队列的任务就会被预设的拒绝策略进行处理

execute()方法示意

execute方法执行示意图.jpg
ThreadPoolExecutor执行execute方法分下面4种情况:

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意:执行这一步骤需要获取全局锁)
  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)
  • 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法

  ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈).在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁

execute()源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}

// 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务
else if (!addIfUnderMaximumPoolSize(command))
// 抛出RejectedExecutionException异常
reject(command); // is shutdown or saturated
}
}

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行.我们可以从Worker类的run()方法里看到这点

1
2
3
4
5
6
7
8
9
10
11
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);task = null;
}
} finally {
workerDone(this);
}
}

自己设计线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class DolphinThreadPoolExecutor {
private final BlockingQueue<Runnable> workQueue;// 待执行的任务,相当于任务缓冲区,其实例对象有ArrayBlockingQueue,LinkedBlockingQueue等

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();// 线程池中的执行线程

private int corePoolSize;// 初始化的线程的个数
private int maxPoolSize;// 线程池的最大个数,当corePoolSize满了,而且BlockingQueue缓冲队列也满了,则初始化额外的线程,如果额外的线程达到上限,则任务丢弃

private int workPoolSize;// 正在工作的线程
private final boolean allowCoreThreadTimeOut;
private final long keepAliveTime;
private final TimeUnit timeUnit;

public DolphinThreadPoolExecutor(int corePoolSize, int maxPoolSize, boolean allowCoreThreadTimeOut,
long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
}

public DolphinThreadPoolExecutor(BlockingQueue<Runnable> workQueue, int corePoolSize, int maxPoolSize) {
this(corePoolSize, maxPoolSize, false, 0L, TimeUnit.SECONDS, workQueue);
}

public void execute(Runnable task) {
if (task == null) {
throw new IllegalArgumentException("The parameter is not null");
}

Worker work = null;
mainLock.lock();

// 创建线程进行执行
if (workPoolSize < corePoolSize) {
work = new Worker(task);
// 放到缓存队列中
} else if (!workQueue.offer(task)) {
// 开启额外的线程的进行执行
if (workPoolSize < maxPoolSize) {
work = new Worker(task);
} else {
throw new RuntimeException("The task was thowed");// 任务的抛弃策略,这里使用抛出异常,即这个任务无法处理
}
}

if (work != null) {
workers.add(work);
work.t.start();
workPoolSize++;
}

mainLock.unlock();
}

public void shutDownNow(){
for(Worker w : workers){
w.t.stop();
}
}

private class Worker implements Runnable {
private Runnable firstTask;
private Thread t;

private boolean state;//0:空闲 1:代表忙碌

public Worker(Runnable firstTask) {
super();
this.firstTask = firstTask;
this.t = new Thread(this);
}

@Override
public void run() {
runWorker(this);
}

private void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
task.run();
task=null;
}
}

private Runnable getTask() {
boolean timeOut;
timeOut = allowCoreThreadTimeOut || workPoolSize > corePoolSize;

try {
// 如果是该线程长期存活,那么使用take线程阻塞的函数,如果是一旦没有任务就销毁线程,1.是核心线程
// 2.线程数大于核心线程,即目前任务不多,线程多余
Runnable task = timeOut ? workQueue.poll(keepAliveTime, timeUnit) : workQueue.take();
if (task != null) {
return task;
} else {
// 如果没有任务被获取,则这个线程即将被销毁
mainLock.lock();
workPoolSize--;
mainLock.unlock();
System.out.println("当前线程被销毁");
workers.remove(this);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
}

自定义线程池后进行线程池的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyTask implements Runnable{
private int taskNum;

public MyTask(int taskNum) {
super();
this.taskNum = taskNum;
}

@Override
public void run() {
System.out.println("正在执行:"+taskNum+"任务");
try {
Thread.sleep(1000);
System.out.println("任务"+taskNum+"結束");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public class DolphinThreadPoolExecutorTest {
public static void main(String[] args) throws InterruptedException {
DolphinThreadPoolExecutor executor=new DolphinThreadPoolExecutor(2, 6, true, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4));
for(int i=0;i<10;i++){
Thread.sleep(100);
executor.execute(new MyTask(i));
}
}
}

为什么不允许使用Executors创建线程池

  • Executors的FixedThreadPool和SingleThreadExecutor => 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常
  • CachedThreadPool => 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常

这就是为什么禁止使用Executors去创建线程池,而是推荐自己去创建ThreadPoolExecutor的原因

文章作者: Eric Liang
文章链接: https://ericql.github.io/2019/11/12/02-Java%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/01-%E5%BA%94%E7%94%A8%E7%AF%87/16-%E7%BA%BF%E7%A8%8B%E6%B1%A0/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Eric Liang
打赏
  • 微信
  • 支付宝