Java并发成神之路-精通JUC并发工具十八般武艺——AQS(八)
为什么需要AQS
ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock- 上面那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于
ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己“业务逻辑”就可以了
Semaphore和AQS的关系
Semaphore内部有一个Sync类,Sync类继承了AQSCountDownLatch和ReentrantLock也是一样的,内部有一个Sync类,Sync类继承了AQS
AQS的作用
- AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便得被写出来
- 一句话总结:有了AQS,构建线程协作类就容易多了
AQS内部原理解析
- AQS最核心的就是三大部分
state状态
- 这里的
state的具体含义,会根据具体实现类的不同而不同,比如啊Semaphore里,它表示“剩余的许可证的数量”,而在CountDownLatch里,它表示“还需要倒数的数量” state是volatile修饰的,会被并发地修改,所以多有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态,这些方法都依赖于j.u.c.atomic包的支持- 在
ReentrantLock中,state用来表示“锁”的占有情况,包括可重入计数,当state的值为0的时候,表示该Lock不被任何线程占有
控制线程抢锁和配合的FIFO队列
- 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
- AQS会维护一个等待的线程队列,把线程都放到这个队列里
期望协作工具类去实现的获取/释放等重要方法
- 这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同
- 获取方法
- 获取操作会依赖
state变量,经常会阻塞(比如获取不到锁) - 在
Semaphore中,获取就是acquire方法,作用是获取一个许可证 - 而在
CountDownLatch里面,获取就是await方法,作用是“等待,直到倒数结束”
- 获取操作会依赖
- 释放方法
- 释放操作不会阻塞
- 在
Semaphore中,释放方法就是release方法,作用是释放一个许可证 CountDownLatch里面,释放就是countDown方法,作用是“倒数一个数”
AQS源码分析
AQS在CountDownLatch中的应用
- 构造函数
CountDownLatch的构造函数如下图所示,首先需要传进去一个数字,这个数字表示倒数几次,而构造函数中直接调用了Sync类的构造函数,从第二个图中可以看出,Sync的构造函数是调用了AQS的setState方法,直接给state赋值

getCount
如下图所示,CountDownLatch.getCount()方法直接调用Sync的getCount()方法,而从上面的Sync类源码图可以看出,Sync.getCount()方法直接调用了AQS的getState方法,获取state的值
countDown
如下图所示,CountDownLatch的countDown方法直接调用了继承自AQS的releaseShared方法,而releaseShared方法中会判断tryReleaseShared返回值进行判断,如果为true进入判断调用doReleaseShared方法,唤醒被阻塞的线程,tryReleaseShared的具体代码可以看上面的Sync类源码图,可以看出是使用for循环做的一个CAS自旋锁,将state减一,然后去更新,之后判断是否已经等于0了,如果减一后等于0了那么说明这是最后一次计数,就会返回true,然后调用doReleaseShared唤醒线程

await
如下图所示,CountDownLatch的await方法直接调用了继承自AQS的acquireSharedInterruptibly方法,而acquireSharedInterruptibly方法中会判断tryAcquireShared方法的返回值是否小于0,如果小于0则进入判断,将当前线程挂起,如果不小于0,则直接跳过,可以从上面的Sync类源码图中看出,其实现的tryAcquireShared方法是直接判断state值是否等于0,如果等于0,就返回1,否则返回-1,也就是说state等于0,直接跳过acquireSharedInterruptibly中的判断,不等于0,就将线程挂起

AQS在CountDownLatch的总结
- 调用
CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞 - 而“共享锁”可获取到的条件,就是“锁计数器”的值为0
- 而“锁计数器”的初值为
count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1 count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待获取共享锁的线程才能继续运行
AQS在Semaphore中的应用
- 在
Semaphore中,state表示许可证的剩余数量 - 看
tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,代表成功 - 这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加
compareAndSetState来改变state的状态,直到改变成功就返回整数,或者是期间如果被其他人修改导致剩余数量不够了,那也返回负数代表获取失败
AQS在ReentrantLock中的应用
unlock()
方法具体情况如下图所示,unlock方法直接调用了Sync类继承于AQS的release方法,方法中首先会判断tryRelease方法的返回值,如果是true,就会执行下面的unparkSuccessor方法,唤醒等待的线程,tryRelease方法被ReentrantLock.Sync类重写,首先会将state减1,因为ReentrantLock是可重入锁,解锁一次就是将state减1,然后判断当前线程是不是正在执行的线程,如果是当前线程,判断计算后的state是否等于0,也就是判断这次解锁是不是最后一层锁,如果不是,直接修改state的值,然后返回false,如果是就会调用setExclusiveOwnerThread方法,将当前运行的线程设置为null,并返回true,唤醒其他的线程


lock()(以非公平锁为例)
如下图可以看出,ReentrantLock的lock方法中直接调用了内部类Sync的lock方法,而第二个图中也可以看出,Sync.lock()方法是一个abstract方法,由它的两个子类实现(NonfairSync和FairSync,这里仅以NonfairSync为例讲述)

如下图可以看出,NonfairSync中的lock方法首先会调用CAS方法更新state的值,如果更新成功了,就直接将当前线程设置为执行线程,如果失败了,会去执行acquire方法,acquire方法是AQS的一个方法
首先方法还会去调用tryAcquire方法,tryAcquire在ReentrantLock中会有公平和非公平两种实现方式,如果tryAcquire方法返回的是true,因为前面的非,则会变成false,直接跳过判断,如果tryAcquire返回的false,加上前面的非,则是true,就会去执行后的语句,addWaiter方法是将当前线程进行封装,acquireQueued方法则是将封装后的Node节点对象,放到等待队列中,等待别的线程释放锁之后将他唤醒
非公平类中tryAcquire方法会去调用nonfairTryAcquire方法,这个方法是由Sync类实现的,方法中首先会判断state是否等于0,如果等于0表示当前锁没有被任何线程持有,就会直接通过CAS修改state的值,然后将持锁线程设置为当前线程,如果当前锁已经被持有,就会判断是否为当前线程持有,如果是,就会将state加入参,也就是1,然后重新设置值,如果不是表示当前锁已经被其他线程持有,直接返回false
分析释放锁的方法tryRelease
- 由于当前是可重入锁,所以
state代表重入次数,每次释放锁,先判断是不是当前持有锁的线程释放的,如果不是就抛异常,如果是的话,重入次数减一,如果减到了0,就说明完全释放了,于是free就是true了,并且把state设置为0
加锁的方法
- 判断当前
state是不是等于0,判断当前线程是不是持有锁的线程,如果都不是就代表当前线程拿不到锁,就会放进线程等待
利用AQS实现一个自己的Latch门闩
如下代码就是用AQS自己实现的门闩功能代码,我们要实现的功能很简单,就是当线程调用await方法的时候,将线程挂起,然后等待执行了signal方法,将所有线程唤醒,因为是可重入的,直接调用AQS的releaseShared方法和acquireShared方法来实现,查看AQS的源码可以看到,acquireShared方法会判断tryAcquireShared(arg) < 0的结果,如果为true就会将线程挂起,所以我们重写AQS的tryAcquireShared方法,判断state的值,如果为1,就返回1,1大于0,直接跳过判断,否则就返回-1,-1小于0,会进入判断,将线程挂起,而state默认值为0,肯定不等于1。
而唤醒线程的signal方法直接调用了AQS的releaseShared方法,这个方法会判断tryReleaseShared(arg)方法的返回值,如果是true,就调用判断中的方法唤醒被挂起的线程,否则就不处理,所以我们实现tryReleaseShared方法,如果只是实现唤醒的话,直接返回true就可以,但是我们要实现门闩只能使用一次的功能,门闩被打开后,再执行await方法不会将线程挂起,所以将state的值设置为1,这样state的时候就是等于1,然后就直接返回1,1大于0,就不会进入判断,也就不会将线程挂起
1 | public class OneShotLatch { |