Java并发核心知识体系精讲——死锁(三)

死锁——从生产到消除

死锁是什么?有什么危害?

什么是死锁?

  • 发生在并发中
  • 互不相让:当两个(或更多)线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,导致程序陷入无尽阻塞,这就是死锁
  • 多个线程造成死锁的情况
    • 如果多个线程之间的依赖关系是环形,存在环路的锁的依赖关系,那么也可能会发生死锁

死锁的影响

  • 死锁的影响不同系统中是不一样的,这取决于系统对死锁的处理能力
    • 数据库中:检测并放弃事务
    • JVM中:无法自动处理

几率不高但危害大

  • 不一定发生,但是遵守“墨菲定律”
  • 一旦发生,多是高并发场景,影响用户多
  • 整个系统崩溃、子系统崩溃、性能降低
  • 压力测试无法找出所有潜在的死锁

发生死锁的例子

最简单的情况

代码如下所示
当类的对象flag=1时(T1),先锁定o1,睡眠500毫秒,然后锁定o2
而T1在睡眠的时候,另一个flag=0的对象(T2)线程启动,先锁定o2,睡眠500毫秒,等待T1释放o1
T1睡眠结束后,需要锁定o2才能继续执行,而此时o2已经被T2锁定
T2睡眠结束后,需要锁定o1才能继续执行,而此时o1已经被T1锁定
T1、T2相互等待,都需要对方锁定的资源才能继续执行,从而死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MustDeadLock implements Runnable{

static Object o1 = new Object();
static Object o2 = new Object();
int flag = 1;

public static void main(String[] args) {
MustDeadLock r1 = new MustDeadLock();
MustDeadLock r2 = new MustDeadLock();
r1.flag = 1;
r2.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}

@Override
public void run() {
if (flag == 1){
synchronized (o1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2){
System.out.println("线程1拿到了锁");
}
}
}
if (flag == 0){
synchronized (o2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1){
System.out.println("线程2拿到了锁");
}
}
}
}
}

实际生产中的例子:转账

  • 需要两把锁(A向B转钱,需要把A的账户锁定,也需要把B的账户锁定)
  • 获取两把锁成功,且余额大于0,则扣除转出人,增加收款人的余额,是原子操作
  • 顺序相反导致死锁(A向B转钱,B同时向A转钱)

死锁的4个必要条件

  • 互斥条件(某一个资源只能同时被一个线程拥有)
  • 请求与保持条件(请求另外一把锁,但是保持自身已有的锁)
  • 不剥夺条件(不会剥夺其中的一个锁给另外一个)
  • 循环等待条件(互相等待)

如何定位死锁?

  • jstack
    • 使用工具查看到运行的java项目的pid
    • 执行jstack pid (查看线程信息)
  • ThreadMXBean代码演示
    • 代码如下所示,在程序进入死锁后,运行如下代码,可以获取死锁的线程信息
      1
      2
      3
      4
      5
      6
      7
      8
      ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
      long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
      if (deadlockedThreads != null && deadlockedThreads.length > 0){
      for (int i = 0; i < deadlockedThreads.length; i++) {
      ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
      System.out.println(threadInfo.getThreadName());
      }
      }

修复死锁的策略

线上发生了死锁应该怎么办?

  • 线上问题都需要防范于未然,不造成损失的扑灭几乎已经是不可能
  • 保存案发现场然后立刻重启服务器
  • 暂时保证线上服务的安全,然后再利用刚才保存的信息,排查死锁,修改代码,重新发版

常见的修复策略

避免测试

  • 避免相反的获取锁的顺序
  • 实际上不在乎获取锁的顺序
    转账换序方案
  • 将获取锁的顺序固定,不允许有的线程先拿到锁一,有的线程先拿到锁二
  • 比如可以根据主键进行排序,无论是你给我转账还是我给你转账,那么获取多的时候都先获取主键值小的那把锁,再获取主键值大的

    代码如下所示
    转账时,获取锁的顺序按照hashCode进行排序,如果hashCode相同,那么就同时尝试获取第三把锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    public class TransferMoney1 implements Runnable {

    static Account a = new Account(500);
    static Account b = new Account(500);
    static Object lock = new Object();
    int flag = 1;

    public static void main(String[] args) throws InterruptedException {
    TransferMoney1 r1 = new TransferMoney1();
    TransferMoney1 r2 = new TransferMoney1();
    r1.flag = 1;
    r2.flag = 0;
    Thread t1 = new Thread(r1);
    Thread t2 = new Thread(r2);
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println("a的余额:" + a.balance);
    System.out.println("b的余额:" + b.balance);
    }
    @Override
    public void run() {
    if (flag == 1) {
    transferMoney(a, b, 200);
    }
    if (flag == 0) {
    transferMoney(b, a, 200);
    }
    }

    private void transferMoney(Account from, Account to, int amount) {
    class Helper {
    public void transfer() {
    if (from.balance - amount < 0) {
    System.out.println("余额不足,转账失败。");
    }
    from.balance -= amount;
    to.balance += amount;
    System.out.println("成功转账" + amount + "元");
    }
    }
    int fromHash = System.identityHashCode(from);
    int toHash = System.identityHashCode(to);
    if (fromHash < toHash) {
    synchronized (from) {
    synchronized (to) {
    new Helper().transfer();
    }
    }
    }else if (fromHash > toHash){
    synchronized (to) {
    synchronized (from) {
    new Helper().transfer();
    }
    }
    }else {
    synchronized (lock){
    synchronized (to) {
    synchronized (from) {
    new Helper().transfer();
    }
    }
    }
    }
    }
    static class Account {
    int balance;

    public Account(int balance) {
    this.balance = balance;
    }
    }
    }
哲学家就餐的换手方案

问题描述:如下图,五个哲学家围着圆桌吃饭,但是只有五根筷子如下分布,每次开始吃饭,哲学家们就会先拿起左边的筷子,再拿起右边的筷子,然后吃饭,如果右边的筷子被人拿了就等着别人放下再获取

  • 假设每个人都拿起了左边的筷子,然后等待右边的,这样就会陷入无尽等待(死锁)

哲学家就餐问题图示

死锁代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DiningPhilosophers {

public static class Philosophers implements Runnable {
Object leftChopstick = new Object();
Object rightChopstick = new Object();
public Philosophers(Object leftChopstick, Object rightChopstick) {
this.leftChopstick = leftChopstick;
this.rightChopstick = rightChopstick;
}

@Override
public void run() {
try {
while (true) {
doAction("Thinking");
synchronized (leftChopstick) {
doAction("拿起左边筷子");
synchronized (rightChopstick) {
doAction("拿起右边筷子");
doAction("放下右边筷子");
}
doAction("放下左边筷子");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

private void doAction(String action) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " " + action);
Thread.sleep((long) (Math.random() * 10));
}
}

public static void main(String[] args) {
Philosophers[] philosophers = new Philosophers[5];
Object[] chopsticks = new Object[philosophers.length];
for (int i = 0; i < chopsticks.length; i++) {
chopsticks[i] = new Object();
}
for (int i = 0; i < philosophers.length; i++) {
Object leftChopstick = chopsticks[i];
Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
philosophers[i] = new Philosophers(leftChopstick, rightChopstick);
new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
}
}
}
多种解决方案
  • 服务员检测(避免策略)

    每次拿筷子找服务员拿,由服务员去协调

  • 改变一个哲学家拿筷子的顺序(避免策略)

    其中一个哲学家先拿起右边的筷子,再拿左边的
    代码更改如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    for (int i = 0; i < philosophers.length; i++) {
    Object leftChopstick = chopsticks[i];
    Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
    if (i == philosophers.length-1)
    philosophers[i] = new Philosophers(rightChopstick,leftChopstick);
    else
    philosophers[i] = new Philosophers(leftChopstick, rightChopstick);
    new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
    }
  • 餐票(避免策略)

    想要吃饭需要先拿到餐票,餐票只有四张

  • 领导调节(检测与恢复策略)

    发现死锁之后,使其中一个哲学家放下筷子

检测与恢复策略

一段时间检测是否有死锁,如果有就剥夺某一个资源,来打开死锁

  • 检测算法:锁的调用链路图
    • 允许发生死锁
    • 每次调用锁都记录
    • 定期检测“锁的调用链路图”中是否存在环路
  • 恢复方法1:进程终止
    • 逐个终止线程,直到死锁消除
    • 终止顺序
      • 优先级(是前台交互还是后台处理)
      • 已占用资源、还需要的资源(还需要一点资源就可以完成)
      • 已经运行时间
  • 恢复方法2:资源抢占
    • 把已经分发出去的锁给收回来
    • 让线程回退几步,这样就不用结束整个线程,成本比较低
    • 缺点:可能同一个线程一直被抢占,那就会造成饥饿

鸵鸟策略

  • 鸵鸟这种动物在遇到危险的时间,通常就会把头埋在地上,这样一来它就看不到危险了。而鸵鸟策略的意思就是说,如果我们发生了死锁的概率极其低,那么我们就直接忽略它,直到死锁发生的时候,再人工修复

实际工程中如何避免死锁

  • 设置超时时间
    • Lock的tryLock(long timeout,TimeUnit unit)
    • synchronized不具备尝试锁的能力
    • 造成超时的可能性很多:发生了死锁、线程陷入了死循环、线程执行很慢
    • 超时时间到了就认为获取锁失败
      具体代码演示如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      public class TryLockDeadlock implements Runnable {
      static Lock lock1 = new ReentrantLock();
      static Lock lock2 = new ReentrantLock();
      int flag = 1;

      public static void main(String[] args) {
      TryLockDeadlock r1 = new TryLockDeadlock();
      TryLockDeadlock r2 = new TryLockDeadlock();
      r1.flag = 1;
      r2.flag = 0;
      new Thread(r1).start();
      new Thread(r2).start();
      }

      @Override
      public void run() {
      for (int i = 0; i < 100; i++) {
      if (flag == 1) {
      try {
      if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
      Thread.sleep(new Random().nextInt(1000));
      if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
      System.out.println("线程1已经成功获取了两把锁");
      lock2.unlock();
      lock1.unlock();
      break;
      } else {
      System.out.println("线程1获取锁2失败,已重试");
      lock1.unlock();
      Thread.sleep(new Random().nextInt(1000));
      }
      } else {
      System.out.println("线程1获取第一把所失败,已重实");
      }
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      if (flag == 0) {
      try {
      if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
      Thread.sleep(new Random().nextInt(1000));
      if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
      System.out.println("线程2已经成功获取了两把锁");
      lock1.unlock();
      lock2.unlock();
      break;
      } else {
      System.out.println("线程2获取锁1失败,已重试");
      lock2.unlock();
      Thread.sleep(new Random().nextInt(1000));
      }
      } else {
      System.out.println("线程1获取第2把所失败,已重实");
      }
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      }
      }
  • 多使用并发类而不是自己设计锁
    • ConcurrentHashMapConcurrentLinkedQueueAtomicBoolean
    • 实际应用中java.util.concurrent.atomic十分有用,简单方便且效率比使用Lock更高
    • 多用并发集合少使用同步集合,并发集合比同步集合的可扩展性更好
    • 并发场景需要用到map,首先想到使用ConcurrentHashMap
  • 尽量降低锁的使用粒度:用不同的锁而不是一个锁
  • 如果能使用同步代码块,就不使用同步方法:自己指定锁对象
  • 给你的线程起一个有意义的名字:debug和排查时事半功倍,框架和JDK都遵守这个最佳实践
  • 避免锁的嵌套:锁嵌套非常容易造成死锁
  • 分配资源前先看能不能收回来:银行家算法
  • 尽量不要几个功能使用同一把锁:专锁专用

其他活性故障

死锁

  • 死锁是最常见的活跃性问题,不过除了刚才的死锁之外,还有一些类似的问题,会导致程序无法顺序执行,统称为活跃性问题

活锁

比如哲学家就餐问题:在完全相同的时刻进入餐厅,并同时拿起左手边的筷子,拿不到右手边的筷子,那么这些哲学家就会等待五分钟,
同时放下手中的筷子,然后再等待五分钟,又同时拿起这些筷子(相同的时间拿起,相同的时间放下,等待相同的时间那么同样无法完成就餐)

什么是活锁

  • 虽然线程并没有阻塞,也始终再运行(所以叫做“活”锁,线程是“活”的),但是程序却得不到进展,因为线程始终重复做同样的事
  • 如果这里是死锁,那么就是这里两个人始终一动不动,直到对方先抬头,他们之间不再说话,只是等待
  • 如果发生活锁,那么这里的情况就是双方都不停的对对方说”你先起来吧,你先起来吧”,双方都一直再说话,在运行
  • 活锁和死锁的结果是一样的,就是谁都不能先抬头

    假设有一对夫妻,只有一套餐具
    吃饭的时候会去检查对方是不是处于饥饿状态,如果对方处于饥饿状态,就会把餐具让给对方
    程序将处于一直谦让的状态无法完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    public class LiveLock {

    static class Spoon {
    private Diner diner;

    public synchronized void use() {
    System.out.printf("%s has eaten!", diner.name);
    }

    public Spoon(Diner diner) {
    this.diner = diner;
    }

    public Diner getDiner() {
    return diner;
    }

    public void setDiner(Diner diner) {
    this.diner = diner;
    }
    }

    static class Diner {
    private String name;
    private boolean idHungry = true;

    public Diner(String name) {
    this.name = name;
    }

    public void eatWith(Spoon spoon, Diner spouse) {
    while (idHungry) {
    if (spoon.diner != this) {
    try {
    Thread.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    continue;
    }
    if (spouse.idHungry) {
    System.out.println(name + "亲爱的" + spouse.name + "你先吃吧");
    spoon.setDiner(spouse);
    continue;
    }
    spoon.use();
    idHungry = false;
    System.out.println(name + " 我已经吃完了");
    spoon.setDiner(spouse);
    }
    }
    }

    public static void main(String[] args) {
    Diner husband = new Diner("牛郎");
    Diner wife = new Diner("织女");

    Spoon spoon = new Spoon(husband);
    new Thread(new Runnable() {
    @Override
    public void run() {
    husband.eatWith(spoon, wife);
    }
    }).start();
    new Thread(new Runnable() {
    @Override
    public void run() {
    wife.eatWith(spoon, husband);
    }
    }).start();
    }
    }

    如何解决活锁问题

  • 原因:重试机制不变,消息队列始终重试,吃饭始终谦让
  • 以太网的指数避退算法
  • 加入随机因素(在进行谦让的时候加入随机因素,比如随机数,某一种情况下不进行谦让)

工程中的活锁实例:消息队列

  • 策略:消息如果处理失败,就放在队列开头重试
  • 由于依赖服务出了问题,处理该消息一直失败(由于依赖服务出现异常,把失败的消息放在队列头部,一直进行重试,一直都不会成功)
  • 没有阻塞,但是程序无法继续
  • 解决:放到队列尾部、重试次数限制

饥饿

  • 当线程需要某些资源(列如CPU),但是始终得不到
  • 线程的优先级设置的过于低,或者有某线程持有锁同时又无限循环从而不释放锁,或者某些程序始终占用某些文件的写锁
  • 饥饿可能会导致相应性差:比如,我们的浏览器有一个线程负责下载图片和文件、计算渲染等,在这种情况下,如果后台线程把CPU资源都占用,那么前台线程将无法得到很好的执行,这会导致用户的体验很差

常见面试题

  • 写一个必然死锁大的例子(上面已经写过了),生产中什么场景下会发生死锁?

    需要获取两把锁的时候,都有可能发生死锁

  • 发生死锁必须满足那些条件

    互斥条件
    请求保持条件
    不剥夺条件
    循环等待条件

  • 如何定位死锁?

    jstack
    ThreadMXBean

  • 有那些解决死锁问题的策略?

    避免策略
    检查与恢复策略
    鸵鸟策略

  • 讲一讲哲学家就餐问题

    上面已经有描述

  • 实际工程中如何避免死锁?

    设置超时时间
    多使用并发类而不是自己设计锁
    尽量降低锁的使用粒度:用不同的锁而不是一个锁
    如果能使用同步代码块,就不适用同步方法:自己指定锁对象
    给线程起一个有意义的名称
    避免锁嵌套
    分配资源前先看能不能收回来
    尽量不要几个功能使用同一把锁:专锁专用

  • 什么是活跃性问题?活锁、饥饿和死锁有什么区别?