Java并发成神之路-精通JUC并发工具十八般武艺——AQS(八)

为什么需要AQS

  • ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock
  • 上面那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLockSemaphore而言就可以屏蔽很多细节,只关注它们自己“业务逻辑”就可以了

Semaphore和AQS的关系

  • Semaphore内部有一个Sync类,Sync类继承了AQS
  • CountDownLatchReentrantLock也是一样的,内部有一个Sync类,Sync类继承了AQS

AQS的作用

  • AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便得被写出来
  • 一句话总结:有了AQS,构建线程协作类就容易多了

AQS内部原理解析

  • AQS最核心的就是三大部分

state状态

  • 这里的state的具体含义,会根据具体实现类的不同而不同,比如啊Semaphore里,它表示“剩余的许可证的数量”,而在CountDownLatch里,它表示“还需要倒数的数量”
  • statevolatile修饰的,会被并发地修改,所以多有修改state的方法都需要保证线程安全,比如getStatesetState以及compareAndSetState操作来读取和更新这个状态,这些方法都依赖于j.u.c.atomic包的支持
  • ReentrantLock中,state用来表示“锁”的占有情况,包括可重入计数,当state的值为0的时候,表示该Lock不被任何线程占有

控制线程抢锁和配合的FIFO队列

  • 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
  • AQS会维护一个等待的线程队列,把线程都放到这个队列里

期望协作工具类去实现的获取/释放等重要方法

  • 这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同
  • 获取方法
    • 获取操作会依赖state变量,经常会阻塞(比如获取不到锁)
    • Semaphore中,获取就是acquire方法,作用是获取一个许可证
    • 而在CountDownLatch里面,获取就是await方法,作用是“等待,直到倒数结束”
  • 释放方法
    • 释放操作不会阻塞
    • Semaphore中,释放方法就是release方法,作用是释放一个许可证
    • CountDownLatch里面,释放就是countDown方法,作用是“倒数一个数”

AQS源码分析

AQS在CountDownLatch中的应用

  • 构造函数
    CountDownLatch的构造函数如下图所示,首先需要传进去一个数字,这个数字表示倒数几次,而构造函数中直接调用了Sync类的构造函数,从第二个图中可以看出,Sync的构造函数是调用了AQS的setState方法,直接给state赋值
    CountDownLatch构造函数源码图示

CountDownLatch-Sync类源码图示

  • getCount
    如下图所示,CountDownLatch.getCount()方法直接调用SyncgetCount()方法,而从上面的Sync类源码图可以看出,Sync.getCount()方法直接调用了AQS的getState方法,获取state的值
    CountDownLatch的getCount方法图示
  • countDown
    如下图所示,CountDownLatchcountDown方法直接调用了继承自AQS的releaseShared方法,而releaseShared方法中会判断tryReleaseShared返回值进行判断,如果为true进入判断调用doReleaseShared方法,唤醒被阻塞的线程,tryReleaseShared的具体代码可以看上面的Sync类源码图,可以看出是使用for循环做的一个CAS自旋锁,将state减一,然后去更新,之后判断是否已经等于0了,如果减一后等于0了那么说明这是最后一次计数,就会返回true,然后调用doReleaseShared唤醒线程
    CountDownLatch的countDown方法图示

AQS的releaseShared方法源码图示

  • await
    如下图所示,CountDownLatchawait方法直接调用了继承自AQS的acquireSharedInterruptibly方法,而acquireSharedInterruptibly方法中会判断tryAcquireShared方法的返回值是否小于0,如果小于0则进入判断,将当前线程挂起,如果不小于0,则直接跳过,可以从上面的Sync类源码图中看出,其实现的tryAcquireShared方法是直接判断state值是否等于0,如果等于0,就返回1,否则返回-1,也就是说state等于0,直接跳过acquireSharedInterruptibly中的判断,不等于0,就将线程挂起
    CountDownLatch的await方法源码图示
    AQS的acquireSharedInterruptibly方法源码图示

AQS在CountDownLatch的总结

  • 调用CountDownLatchawait方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞
  • 而“共享锁”可获取到的条件,就是“锁计数器”的值为0
  • 而“锁计数器”的初值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1
  • count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待获取共享锁的线程才能继续运行

AQS在Semaphore中的应用

  • Semaphore中,state表示许可证的剩余数量
  • tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,代表成功
  • 这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加compareAndSetState来改变state的状态,直到改变成功就返回整数,或者是期间如果被其他人修改导致剩余数量不够了,那也返回负数代表获取失败

AQS在ReentrantLock中的应用

  • unlock()
    方法具体情况如下图所示,unlock方法直接调用了Sync类继承于AQS的release方法,方法中首先会判断tryRelease方法的返回值,如果是true,就会执行下面的unparkSuccessor方法,唤醒等待的线程,tryRelease方法被ReentrantLock.Sync类重写,首先会将state减1,因为ReentrantLock是可重入锁,解锁一次就是将state减1,然后判断当前线程是不是正在执行的线程,如果是当前线程,判断计算后的state是否等于0,也就是判断这次解锁是不是最后一层锁,如果不是,直接修改state的值,然后返回false,如果是就会调用setExclusiveOwnerThread方法,将当前运行的线程设置为null,并返回true,唤醒其他的线程
    ReentrantLock的unlock方法源码图示
    AQS的release方法源码图示
    ReentrantLock-Sync类的tryRelease方法源码图示
  • lock()(以非公平锁为例)
    如下图可以看出,ReentrantLocklock方法中直接调用了内部类Synclock方法,而第二个图中也可以看出,Sync.lock()方法是一个abstract方法,由它的两个子类实现(NonfairSyncFairSync,这里仅以NonfairSync为例讲述)
    ReentrantLock的lock方法源码图示
    ReentrantLock-Sync类的lock方法源码图示
    如下图可以看出,NonfairSync中的lock方法首先会调用CAS方法更新state的值,如果更新成功了,就直接将当前线程设置为执行线程,如果失败了,会去执行acquire方法,acquire方法是AQS的一个方法
    ReentrantLock-NonfairSync类源码图示
    首先方法还会去调用tryAcquire方法,tryAcquireReentrantLock中会有公平和非公平两种实现方式,如果tryAcquire方法返回的是true,因为前面的非,则会变成false,直接跳过判断,如果tryAcquire返回的false,加上前面的非,则是true,就会去执行后的语句,addWaiter方法是将当前线程进行封装,acquireQueued方法则是将封装后的Node节点对象,放到等待队列中,等待别的线程释放锁之后将他唤醒
    AQS的acquire方法源码示意图
    非公平类中tryAcquire方法会去调用nonfairTryAcquire方法,这个方法是由Sync类实现的,方法中首先会判断state是否等于0,如果等于0表示当前锁没有被任何线程持有,就会直接通过CAS修改state的值,然后将持锁线程设置为当前线程,如果当前锁已经被持有,就会判断是否为当前线程持有,如果是,就会将state加入参,也就是1,然后重新设置值,如果不是表示当前锁已经被其他线程持有,直接返回false
    ReentrantLock-Sync类的nonfairTryAcquire方法源码图示

分析释放锁的方法tryRelease

  • 由于当前是可重入锁,所以state代表重入次数,每次释放锁,先判断是不是当前持有锁的线程释放的,如果不是就抛异常,如果是的话,重入次数减一,如果减到了0,就说明完全释放了,于是free就是true了,并且把state设置为0

加锁的方法

  • 判断当前state是不是等于0,判断当前线程是不是持有锁的线程,如果都不是就代表当前线程拿不到锁,就会放进线程等待

利用AQS实现一个自己的Latch门闩

如下代码就是用AQS自己实现的门闩功能代码,我们要实现的功能很简单,就是当线程调用await方法的时候,将线程挂起,然后等待执行了signal方法,将所有线程唤醒,因为是可重入的,直接调用AQS的releaseShared方法和acquireShared方法来实现,查看AQS的源码可以看到,acquireShared方法会判断tryAcquireShared(arg) < 0的结果,如果为true就会将线程挂起,所以我们重写AQS的tryAcquireShared方法,判断state的值,如果为1,就返回1,1大于0,直接跳过判断,否则就返回-1,-1小于0,会进入判断,将线程挂起,而state默认值为0,肯定不等于1。
而唤醒线程的signal方法直接调用了AQS的releaseShared方法,这个方法会判断tryReleaseShared(arg)方法的返回值,如果是true,就调用判断中的方法唤醒被挂起的线程,否则就不处理,所以我们实现tryReleaseShared方法,如果只是实现唤醒的话,直接返回true就可以,但是我们要实现门闩只能使用一次的功能,门闩被打开后,再执行await方法不会将线程挂起,所以将state的值设置为1,这样state的时候就是等于1,然后就直接返回1,1大于0,就不会进入判断,也就不会将线程挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class OneShotLatch {

private final Sync sync = new Sync();

public void signal(){
sync.releaseShared(0);
}

public void await(){
sync.acquireShared(0);
}

private class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}

@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}
}

public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch");
oneShotLatch.await();
System.out.println(Thread.currentThread().getName()+"开闸放行,继续前进");
}
}).start();
}
Thread.sleep(5000);
oneShotLatch.signal();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch");
oneShotLatch.await();
System.out.println(Thread.currentThread().getName()+"开闸放行,继续前进");
}
}).start();
}
}