Java并发核心知识体系精讲——死锁(三)
死锁——从生产到消除
死锁是什么?有什么危害?
什么是死锁?
- 发生在并发中
- 互不相让:当两个(或更多)线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,导致程序陷入无尽阻塞,这就是死锁
- 多个线程造成死锁的情况
- 如果多个线程之间的依赖关系是环形,存在环路的锁的依赖关系,那么也可能会发生死锁
死锁的影响
- 死锁的影响不同系统中是不一样的,这取决于系统对死锁的处理能力
- 数据库中:检测并放弃事务
- JVM中:无法自动处理
几率不高但危害大
- 不一定发生,但是遵守“墨菲定律”
- 一旦发生,多是高并发场景,影响用户多
- 整个系统崩溃、子系统崩溃、性能降低
- 压力测试无法找出所有潜在的死锁
发生死锁的例子
最简单的情况
代码如下所示
当类的对象flag=1时(T1),先锁定o1,睡眠500毫秒,然后锁定o2
而T1在睡眠的时候,另一个flag=0的对象(T2)线程启动,先锁定o2,睡眠500毫秒,等待T1释放o1
T1睡眠结束后,需要锁定o2才能继续执行,而此时o2已经被T2锁定
T2睡眠结束后,需要锁定o1才能继续执行,而此时o1已经被T1锁定
T1、T2相互等待,都需要对方锁定的资源才能继续执行,从而死锁
1 | public class MustDeadLock implements Runnable{ |
实际生产中的例子:转账
- 需要两把锁(A向B转钱,需要把A的账户锁定,也需要把B的账户锁定)
- 获取两把锁成功,且余额大于0,则扣除转出人,增加收款人的余额,是原子操作
- 顺序相反导致死锁(A向B转钱,B同时向A转钱)
死锁的4个必要条件
- 互斥条件(某一个资源只能同时被一个线程拥有)
- 请求与保持条件(请求另外一把锁,但是保持自身已有的锁)
- 不剥夺条件(不会剥夺其中的一个锁给另外一个)
- 循环等待条件(互相等待)
如何定位死锁?
- jstack
- 使用工具查看到运行的java项目的pid
- 执行jstack pid (查看线程信息)
- ThreadMXBean代码演示
- 代码如下所示,在程序进入死锁后,运行如下代码,可以获取死锁的线程信息
1
2
3
4
5
6
7
8ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0){
for (int i = 0; i < deadlockedThreads.length; i++) {
ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
System.out.println(threadInfo.getThreadName());
}
}
- 代码如下所示,在程序进入死锁后,运行如下代码,可以获取死锁的线程信息
修复死锁的策略
线上发生了死锁应该怎么办?
- 线上问题都需要防范于未然,不造成损失的扑灭几乎已经是不可能
- 保存案发现场然后立刻重启服务器
- 暂时保证线上服务的安全,然后再利用刚才保存的信息,排查死锁,修改代码,重新发版
常见的修复策略
避免测试
- 避免相反的获取锁的顺序
- 实际上不在乎获取锁的顺序
转账换序方案
- 将获取锁的顺序固定,不允许有的线程先拿到锁一,有的线程先拿到锁二
- 比如可以根据主键进行排序,无论是你给我转账还是我给你转账,那么获取多的时候都先获取主键值小的那把锁,再获取主键值大的
代码如下所示
转账时,获取锁的顺序按照hashCode进行排序,如果hashCode相同,那么就同时尝试获取第三把锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74public class TransferMoney1 implements Runnable {
static Account a = new Account(500);
static Account b = new Account(500);
static Object lock = new Object();
int flag = 1;
public static void main(String[] args) throws InterruptedException {
TransferMoney1 r1 = new TransferMoney1();
TransferMoney1 r2 = new TransferMoney1();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a的余额:" + a.balance);
System.out.println("b的余额:" + b.balance);
}
public void run() {
if (flag == 1) {
transferMoney(a, b, 200);
}
if (flag == 0) {
transferMoney(b, a, 200);
}
}
private void transferMoney(Account from, Account to, int amount) {
class Helper {
public void transfer() {
if (from.balance - amount < 0) {
System.out.println("余额不足,转账失败。");
}
from.balance -= amount;
to.balance += amount;
System.out.println("成功转账" + amount + "元");
}
}
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash) {
synchronized (from) {
synchronized (to) {
new Helper().transfer();
}
}
}else if (fromHash > toHash){
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}else {
synchronized (lock){
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}
}
}
static class Account {
int balance;
public Account(int balance) {
this.balance = balance;
}
}
}
哲学家就餐的换手方案
问题描述:如下图,五个哲学家围着圆桌吃饭,但是只有五根筷子如下分布,每次开始吃饭,哲学家们就会先拿起左边的筷子,再拿起右边的筷子,然后吃饭,如果右边的筷子被人拿了就等着别人放下再获取
- 假设每个人都拿起了左边的筷子,然后等待右边的,这样就会陷入无尽等待(死锁)
死锁代码如下所示:
1 | public class DiningPhilosophers { |
多种解决方案
- 服务员检测(避免策略)
每次拿筷子找服务员拿,由服务员去协调
- 改变一个哲学家拿筷子的顺序(避免策略)
其中一个哲学家先拿起右边的筷子,再拿左边的
代码更改如下1
2
3
4
5
6
7
8
9for (int i = 0; i < philosophers.length; i++) {
Object leftChopstick = chopsticks[i];
Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
if (i == philosophers.length-1)
philosophers[i] = new Philosophers(rightChopstick,leftChopstick);
else
philosophers[i] = new Philosophers(leftChopstick, rightChopstick);
new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
} - 餐票(避免策略)
想要吃饭需要先拿到餐票,餐票只有四张
- 领导调节(检测与恢复策略)
发现死锁之后,使其中一个哲学家放下筷子
检测与恢复策略
一段时间检测是否有死锁,如果有就剥夺某一个资源,来打开死锁
- 检测算法:锁的调用链路图
- 允许发生死锁
- 每次调用锁都记录
- 定期检测“锁的调用链路图”中是否存在环路
- 恢复方法1:进程终止
- 逐个终止线程,直到死锁消除
- 终止顺序
- 优先级(是前台交互还是后台处理)
- 已占用资源、还需要的资源(还需要一点资源就可以完成)
- 已经运行时间
- 恢复方法2:资源抢占
- 把已经分发出去的锁给收回来
- 让线程回退几步,这样就不用结束整个线程,成本比较低
- 缺点:可能同一个线程一直被抢占,那就会造成饥饿
鸵鸟策略
- 鸵鸟这种动物在遇到危险的时间,通常就会把头埋在地上,这样一来它就看不到危险了。而鸵鸟策略的意思就是说,如果我们发生了死锁的概率极其低,那么我们就直接忽略它,直到死锁发生的时候,再人工修复
实际工程中如何避免死锁
- 设置超时时间
- Lock的tryLock(long timeout,TimeUnit unit)
- synchronized不具备尝试锁的能力
- 造成超时的可能性很多:发生了死锁、线程陷入了死循环、线程执行很慢
- 超时时间到了就认为获取锁失败
具体代码演示如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public class TryLockDeadlock implements Runnable {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
int flag = 1;
public static void main(String[] args) {
TryLockDeadlock r1 = new TryLockDeadlock();
TryLockDeadlock r2 = new TryLockDeadlock();
r1.flag = 1;
r2.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}
public void run() {
for (int i = 0; i < 100; i++) {
if (flag == 1) {
try {
if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
Thread.sleep(new Random().nextInt(1000));
if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
System.out.println("线程1已经成功获取了两把锁");
lock2.unlock();
lock1.unlock();
break;
} else {
System.out.println("线程1获取锁2失败,已重试");
lock1.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程1获取第一把所失败,已重实");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (flag == 0) {
try {
if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
Thread.sleep(new Random().nextInt(1000));
if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
System.out.println("线程2已经成功获取了两把锁");
lock1.unlock();
lock2.unlock();
break;
} else {
System.out.println("线程2获取锁1失败,已重试");
lock2.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程1获取第2把所失败,已重实");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
- 多使用并发类而不是自己设计锁
ConcurrentHashMap
、ConcurrentLinkedQueue
、AtomicBoolean
等- 实际应用中
java.util.concurrent.atomic
十分有用,简单方便且效率比使用Lock更高 - 多用并发集合少使用同步集合,并发集合比同步集合的可扩展性更好
- 并发场景需要用到map,首先想到使用
ConcurrentHashMap
- 尽量降低锁的使用粒度:用不同的锁而不是一个锁
- 如果能使用同步代码块,就不使用同步方法:自己指定锁对象
- 给你的线程起一个有意义的名字:debug和排查时事半功倍,框架和JDK都遵守这个最佳实践
- 避免锁的嵌套:锁嵌套非常容易造成死锁
- 分配资源前先看能不能收回来:银行家算法
- 尽量不要几个功能使用同一把锁:专锁专用
其他活性故障
死锁
- 死锁是最常见的活跃性问题,不过除了刚才的死锁之外,还有一些类似的问题,会导致程序无法顺序执行,统称为活跃性问题
活锁
比如哲学家就餐问题:在完全相同的时刻进入餐厅,并同时拿起左手边的筷子,拿不到右手边的筷子,那么这些哲学家就会等待五分钟,
同时放下手中的筷子,然后再等待五分钟,又同时拿起这些筷子(相同的时间拿起,相同的时间放下,等待相同的时间那么同样无法完成就餐)
什么是活锁
- 虽然线程并没有阻塞,也始终再运行(所以叫做“活”锁,线程是“活”的),但是程序却得不到进展,因为线程始终重复做同样的事
- 如果这里是死锁,那么就是这里两个人始终一动不动,直到对方先抬头,他们之间不再说话,只是等待
- 如果发生活锁,那么这里的情况就是双方都不停的对对方说”你先起来吧,你先起来吧”,双方都一直再说话,在运行
- 活锁和死锁的结果是一样的,就是谁都不能先抬头
假设有一对夫妻,只有一套餐具
吃饭的时候会去检查对方是不是处于饥饿状态,如果对方处于饥饿状态,就会把餐具让给对方
程序将处于一直谦让的状态无法完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public class LiveLock {
static class Spoon {
private Diner diner;
public synchronized void use() {
System.out.printf("%s has eaten!", diner.name);
}
public Spoon(Diner diner) {
this.diner = diner;
}
public Diner getDiner() {
return diner;
}
public void setDiner(Diner diner) {
this.diner = diner;
}
}
static class Diner {
private String name;
private boolean idHungry = true;
public Diner(String name) {
this.name = name;
}
public void eatWith(Spoon spoon, Diner spouse) {
while (idHungry) {
if (spoon.diner != this) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
if (spouse.idHungry) {
System.out.println(name + "亲爱的" + spouse.name + "你先吃吧");
spoon.setDiner(spouse);
continue;
}
spoon.use();
idHungry = false;
System.out.println(name + " 我已经吃完了");
spoon.setDiner(spouse);
}
}
}
public static void main(String[] args) {
Diner husband = new Diner("牛郎");
Diner wife = new Diner("织女");
Spoon spoon = new Spoon(husband);
new Thread(new Runnable() {
public void run() {
husband.eatWith(spoon, wife);
}
}).start();
new Thread(new Runnable() {
public void run() {
wife.eatWith(spoon, husband);
}
}).start();
}
}如何解决活锁问题
- 原因:重试机制不变,消息队列始终重试,吃饭始终谦让
- 以太网的指数避退算法
- 加入随机因素(在进行谦让的时候加入随机因素,比如随机数,某一种情况下不进行谦让)
工程中的活锁实例:消息队列
- 策略:消息如果处理失败,就放在队列开头重试
- 由于依赖服务出了问题,处理该消息一直失败(由于依赖服务出现异常,把失败的消息放在队列头部,一直进行重试,一直都不会成功)
- 没有阻塞,但是程序无法继续
- 解决:放到队列尾部、重试次数限制
饥饿
- 当线程需要某些资源(列如CPU),但是始终得不到
- 线程的优先级设置的过于低,或者有某线程持有锁同时又无限循环从而不释放锁,或者某些程序始终占用某些文件的写锁
- 饥饿可能会导致相应性差:比如,我们的浏览器有一个线程负责下载图片和文件、计算渲染等,在这种情况下,如果后台线程把CPU资源都占用,那么前台线程将无法得到很好的执行,这会导致用户的体验很差
常见面试题
写一个必然死锁大的例子(上面已经写过了),生产中什么场景下会发生死锁?
需要获取两把锁的时候,都有可能发生死锁
发生死锁必须满足那些条件
互斥条件
请求保持条件
不剥夺条件
循环等待条件如何定位死锁?
jstack
ThreadMXBean有那些解决死锁问题的策略?
避免策略
检查与恢复策略
鸵鸟策略讲一讲哲学家就餐问题
上面已经有描述
实际工程中如何避免死锁?
设置超时时间
多使用并发类而不是自己设计锁
尽量降低锁的使用粒度:用不同的锁而不是一个锁
如果能使用同步代码块,就不适用同步方法:自己指定锁对象
给线程起一个有意义的名称
避免锁嵌套
分配资源前先看能不能收回来
尽量不要几个功能使用同一把锁:专锁专用什么是活跃性问题?活锁、饥饿和死锁有什么区别?