Java并发成神之路-精通JUC并发工具十八般武艺——线程池(一)

并发工具类——分类

  • 为了并发安全:互斥同步、非互斥同步、无同步方案
  • 管理线程、提高效率
  • 线程协作

并发工具的分类

线程池的自我介绍

线程池的重要性

  • 如果不使用线程池,每个任务都需要新开一个线程处理
    • 一个线程还好,如果是几千的任务,每次都创建线程
  • 这样开销太大,我们希望有固定的数量的线程,来执行这几千个任务,这样就避免了反复创建并销毁线程所带来的开销问题

为什么要使用线程池

  • 问题一:反复创建线程开销大
  • 问题二:过多的线程会占用太多内存
  • 解决以上两个问题的思路
    • 用少量的线程——避免内存占用过多
    • 让这部分线程都保持工作,且可以反复执行任务——避免生命周期的损耗

线程池的好处

  • 加快相应速度
  • 合理利用CPU和内存
  • 统一管理

线程池适合应用的场景

  • 服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
  • 实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理

创建和停止线程池

线程池构造函数的参数

参数名 类型 含义
corePoolSize int 核心线程数,详解见下文
maximumPoolSize int 最大线程数,详解见下文
keepAliveTime long 保持存活时间
unit TimeUnit 保持存活时间的单位
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
Handler RejectedExecutionHandler 由于线程池也无法接受你所提交的任务的拒绝策略

参数中corePoolSizemacPoolSize

  • corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
  • 线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些增加的线程数有一个上限,这就是最大量maximumPoolSize

添加线程规则

  • 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新的线程来运行新任务
  • 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
  • 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新的线程来运任务
  • 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务
  • 是否需要增加线程的判断顺序是:
    • corePoolSize
    • workQueue
    • maximumPoolSize

增减线程的特点

  • 通过设置corePoolSizemaximumPoolSize相同,就可以创建一个固定大小的线程池
  • 线程池希望保持较少的线程数,并且只有在负载变得很大的时候才增加它
  • 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务
  • 是只有队列填满的时候才会创建对于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBockingQueue),那么线程数就不会超过corePoolSize

keepAliveTime

  • 如果线程池当前的线程数对于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止

ThreadFactory用来创建线程

  • 新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程,如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。
  • 通常情况下我们使用默认的ThreadFactory就可以了

工作队列

  • 有3中最常见的队列类型:
    • 直接交接:SynchronousQueue(没有队列,里面不存任务,直接交给线程执行)
    • 无界队列:LinkedBlockingQueue(队列没有界限,永远不会被塞满,但是可能会内存溢出)
    • 有界队列:ArrayBlockingQueue(可以设置队列大小)

线程池应该手动创建还是自动创建

  • 手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险
  • 让我们来看看自动创建线程池(也就是直接调用JDK封装好的构造函数)可能带来那些问题

newFixedThreadPool

  • 创建一个定长的线程池
  • 由于传进去的LinkedBlockQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM。

    OOM的代码如下,任务数调到最大,每个线程都直接休眠,任务一直无法完成,就会全部堆积在队列中,最终导致队列OOM

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class FixedThreadPoolOOM {
    static ExecutorService executorService = Executors.newFixedThreadPool(1);
    public static void main(String[] args) {
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executorService.execute(new SubThread());
    }
    }
    }
    class SubThread implements Runnable{

    @Override
    public void run() {
    try {
    Thread.sleep(1000000000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    newSingleThreadExecutor

  • 创建一个单线程的线程池
  • 可以看出,这里和刚才的newFixedThreadPool的原理基本是一样的,只不过是把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量内存

newCachedThreadPool

  • 可以缓存的线程池
  • 这里的弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM

newScheduledThreadPool

  • 支持定时及周期性任务执行的线程池
  • 可以在指定时间后执行任务,也可以在指定时间后每隔多久执行一次任务

workStealingPool是JDK8加入的

  • 子任务:适合有子任务的场景(比如遍历二叉树,一个任务会创建多个子任务)
  • 窃取:当其他的线程执行完任务之后,如果某一个线程的子任务还没有执行完毕,其他线程就会帮他执行,任务执行是没有顺序的

正确的创建线程池的方法

  • 根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等待

线程池里的线程数量设定为多少比较合适

  • CPU密集性(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
  • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上
  • 参考Brain Goetz推荐的计算方法:
    • 线程数 = CPU核心数 * (1 + 平均等待时间 / 平均工作时间)

停止线程池的正确方法

  • shutdown()
    • 并不会立即停止线程池,而是通知线程池进行停止
    • 线程池会将正在等待的以及正在执行的任务执行完毕之后再停止
    • 但是再执行该方法之后,就无法再向线程池提交任务
  • isShutdown()
    • 判断线程池是否已经执行过了shutdown
    • 并不是返回true就是线程池已经结束,而是开始准备结束
  • isTerminated()
    • 判断线程池是否真的结束
  • awaitTermination(long timeout, TimeUnit unit)
    • 会阻塞线程,阻塞时间根据参数指定
    • 然后返回一个布尔类型的结果
    • 结果是判断线程池是否关闭
    • 阻塞一定的时间,然后判断线程池是否已经关闭
  • shutdownNow
    • 立即关闭线程池
    • 会多所有正在执行任务发出中断指令
    • 有一个List的返回值,返回的是所有正在等待中的任务的集合

任务太多,怎么拒绝?

拒绝时机

  • 当线程池关闭时,提交新任务会被拒绝
  • 当线程池对最大线程和工作队列容量使用有限制边界并且已经饱和的时候会拒绝任务

4种拒绝策略

  • AbortPolicy
    • 直接抛出异常
  • DiscardPolicy
    • 直接丢弃任务,并且不会通知
  • DiscardOldestPolicy
    • 丢弃最老的任务,将新的放进队列
  • CallerRunPolicy
    • 让提交任务的线程去执行任务(同步执行)

钩子方法,给线程池加点料

具体实现如以下代码所示,代码的功能是实现一个可以暂停的线程池,等待一定时间后还能被唤醒
继承ThreadPoolExecutor类,并重写beforeExecute方法,这个方法会在线程执行每一个任务之前执行
通过标记位,实现对线程的暂停和恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class PauseableThreadPool extends ThreadPoolExecutor {

private boolean isPaused;
private final Lock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();

public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10,20,10L,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行了");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}

}

private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}

public void resume(){
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
}finally {
lock.unlock();
}
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
}

实现原理,源码分析

线程池组成部分

  • 线程池管理器
  • 工作线程
  • 任务队列
  • 任务接口(Task)

Executor家族

  • 线程池、ThreadPoolExecutorExecutorServiceExecutorExecutors等这么多和线程池相关的类,大家都是什么关系呢?
  • Executor

    线程池的最顶级接口,只提供了执行线程的方法

  • ExecutorService

    Executor接口的继承,同时添加了一些其他的方法,比如停止线程池的方法等

  • ThreadPoolExecutor

    线程池的最终实现类

  • Executors

    一个工具类,提供了一些创建线程池的方法

线程池实现线程复用的原理

  • 相同线程执行不同任务

    下面代码是从ThreadPoolExecutor类中摘取的启动任务的方法
    从代码中可以看出来,主要是通过while循环,判断当前任务不为空,或者能取到下一个任务
    而任务的类型就是Runnable类型的,之后就直接执行它的run()方法运行任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // if not, ensure thread is not interrupted. This
    // requires a recheck in second case to deal with
    // shutdownNow race while clearing interrupt
    if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }

线程池状态

  • RUNNING:接受新任务并且处理排队任务
  • SHUTDOWN:不接受新的任务,但是处理排队中的任务
  • STOP:不接受新的任务,也不处理排队中的任务,并且中断正在进行中的任务
  • TIDYING,中文是整洁,理解了中文就容易理解这个状态了:所有任务都已经终止,workerCount为0时,线程会转换到TIDYING状态,并将运行terminate()钩子方法
  • TERMINATED:terminate()运行完成

使用线程池的注意点

  • 避免任务堆积
  • 避免线程数过度增加
  • 排查线程泄漏(线程一直无法回收)