Java并发成神之路-精通JUC并发工具十八般武艺——AQS(八)
为什么需要AQS
ReentrantLock
、Semaphore
、CountDownLatch
、ReentrantReadWriteLock
- 上面那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于
ReentrantLock
和Semaphore
而言就可以屏蔽很多细节,只关注它们自己“业务逻辑”就可以了
Semaphore
和AQS的关系
Semaphore
内部有一个Sync
类,Sync
类继承了AQSCountDownLatch
和ReentrantLock
也是一样的,内部有一个Sync
类,Sync
类继承了AQS
AQS的作用
- AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便得被写出来
- 一句话总结:有了AQS,构建线程协作类就容易多了
AQS内部原理解析
- AQS最核心的就是三大部分
state状态
- 这里的
state
的具体含义,会根据具体实现类的不同而不同,比如啊Semaphore
里,它表示“剩余的许可证的数量”,而在CountDownLatch
里,它表示“还需要倒数的数量” state
是volatile
修饰的,会被并发地修改,所以多有修改state
的方法都需要保证线程安全,比如getState
、setState
以及compareAndSetState
操作来读取和更新这个状态,这些方法都依赖于j.u.c.atomic
包的支持- 在
ReentrantLock
中,state
用来表示“锁”的占有情况,包括可重入计数,当state
的值为0的时候,表示该Lock
不被任何线程占有
控制线程抢锁和配合的FIFO队列
- 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
- AQS会维护一个等待的线程队列,把线程都放到这个队列里
期望协作工具类去实现的获取/释放等重要方法
- 这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同
- 获取方法
- 获取操作会依赖
state
变量,经常会阻塞(比如获取不到锁) - 在
Semaphore
中,获取就是acquire
方法,作用是获取一个许可证 - 而在
CountDownLatch
里面,获取就是await
方法,作用是“等待,直到倒数结束”
- 获取操作会依赖
- 释放方法
- 释放操作不会阻塞
- 在
Semaphore
中,释放方法就是release
方法,作用是释放一个许可证 CountDownLatch
里面,释放就是countDown
方法,作用是“倒数一个数”
AQS源码分析
AQS在CountDownLatch
中的应用
- 构造函数
CountDownLatch
的构造函数如下图所示,首先需要传进去一个数字,这个数字表示倒数几次,而构造函数中直接调用了Sync
类的构造函数,从第二个图中可以看出,Sync
的构造函数是调用了AQS的setState
方法,直接给state
赋值
getCount
如下图所示,CountDownLatch.getCount()
方法直接调用Sync
的getCount()
方法,而从上面的Sync类源码图可以看出,Sync.getCount()
方法直接调用了AQS的getState
方法,获取state
的值countDown
如下图所示,CountDownLatch
的countDown
方法直接调用了继承自AQS的releaseShared
方法,而releaseShared
方法中会判断tryReleaseShared
返回值进行判断,如果为true进入判断调用doReleaseShared
方法,唤醒被阻塞的线程,tryReleaseShared
的具体代码可以看上面的Sync类源码图,可以看出是使用for循环做的一个CAS自旋锁,将state
减一,然后去更新,之后判断是否已经等于0了,如果减一后等于0了那么说明这是最后一次计数,就会返回true,然后调用doReleaseShared
唤醒线程
await
如下图所示,CountDownLatch
的await
方法直接调用了继承自AQS的acquireSharedInterruptibly
方法,而acquireSharedInterruptibly
方法中会判断tryAcquireShared
方法的返回值是否小于0,如果小于0则进入判断,将当前线程挂起,如果不小于0,则直接跳过,可以从上面的Sync
类源码图中看出,其实现的tryAcquireShared
方法是直接判断state值是否等于0,如果等于0,就返回1,否则返回-1,也就是说state等于0,直接跳过acquireSharedInterruptibly
中的判断,不等于0,就将线程挂起
AQS在CountDownLatch
的总结
- 调用
CountDownLatch
的await
方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞 - 而“共享锁”可获取到的条件,就是“锁计数器”的值为0
- 而“锁计数器”的初值为
count
,每当一个线程调用该CountDownLatch
对象的countDown()
方法时,才将“锁计数器”-1 count
个线程调用countDown()
之后,“锁计数器”才为0,而前面提到的等待获取共享锁的线程才能继续运行
AQS在Semaphore
中的应用
- 在
Semaphore
中,state
表示许可证的剩余数量 - 看
tryAcquire
方法,判断nonfairTryAcquireShared
大于等于0的话,代表成功 - 这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加
compareAndSetState
来改变state
的状态,直到改变成功就返回整数,或者是期间如果被其他人修改导致剩余数量不够了,那也返回负数代表获取失败
AQS在ReentrantLock
中的应用
unlock()
方法具体情况如下图所示,unlock
方法直接调用了Sync
类继承于AQS的release
方法,方法中首先会判断tryRelease
方法的返回值,如果是true,就会执行下面的unparkSuccessor
方法,唤醒等待的线程,tryRelease
方法被ReentrantLock.Sync
类重写,首先会将state
减1,因为ReentrantLock
是可重入锁,解锁一次就是将state
减1,然后判断当前线程是不是正在执行的线程,如果是当前线程,判断计算后的state
是否等于0,也就是判断这次解锁是不是最后一层锁,如果不是,直接修改state
的值,然后返回false,如果是就会调用setExclusiveOwnerThread
方法,将当前运行的线程设置为null,并返回true,唤醒其他的线程lock()
(以非公平锁为例)
如下图可以看出,ReentrantLock
的lock
方法中直接调用了内部类Sync
的lock
方法,而第二个图中也可以看出,Sync.lock()
方法是一个abstract
方法,由它的两个子类实现(NonfairSync
和FairSync
,这里仅以NonfairSync
为例讲述)
如下图可以看出,NonfairSync
中的lock
方法首先会调用CAS方法更新state
的值,如果更新成功了,就直接将当前线程设置为执行线程,如果失败了,会去执行acquire
方法,acquire
方法是AQS的一个方法
首先方法还会去调用tryAcquire
方法,tryAcquire
在ReentrantLock
中会有公平和非公平两种实现方式,如果tryAcquire
方法返回的是true,因为前面的非,则会变成false,直接跳过判断,如果tryAcquire
返回的false,加上前面的非,则是true,就会去执行后的语句,addWaiter
方法是将当前线程进行封装,acquireQueued
方法则是将封装后的Node节点对象,放到等待队列中,等待别的线程释放锁之后将他唤醒
非公平类中tryAcquire
方法会去调用nonfairTryAcquire
方法,这个方法是由Sync
类实现的,方法中首先会判断state
是否等于0,如果等于0表示当前锁没有被任何线程持有,就会直接通过CAS修改state
的值,然后将持锁线程设置为当前线程,如果当前锁已经被持有,就会判断是否为当前线程持有,如果是,就会将state
加入参,也就是1,然后重新设置值,如果不是表示当前锁已经被其他线程持有,直接返回false
分析释放锁的方法tryRelease
- 由于当前是可重入锁,所以
state
代表重入次数,每次释放锁,先判断是不是当前持有锁的线程释放的,如果不是就抛异常,如果是的话,重入次数减一,如果减到了0,就说明完全释放了,于是free就是true了,并且把state
设置为0
加锁的方法
- 判断当前
state
是不是等于0,判断当前线程是不是持有锁的线程,如果都不是就代表当前线程拿不到锁,就会放进线程等待
利用AQS实现一个自己的Latch门闩
如下代码就是用AQS自己实现的门闩功能代码,我们要实现的功能很简单,就是当线程调用await
方法的时候,将线程挂起,然后等待执行了signal
方法,将所有线程唤醒,因为是可重入的,直接调用AQS的releaseShared
方法和acquireShared
方法来实现,查看AQS的源码可以看到,acquireShared
方法会判断tryAcquireShared(arg) < 0
的结果,如果为true就会将线程挂起,所以我们重写AQS的tryAcquireShared
方法,判断state
的值,如果为1,就返回1,1大于0,直接跳过判断,否则就返回-1,-1小于0,会进入判断,将线程挂起,而state
默认值为0,肯定不等于1。
而唤醒线程的signal
方法直接调用了AQS的releaseShared
方法,这个方法会判断tryReleaseShared(arg)
方法的返回值,如果是true,就调用判断中的方法唤醒被挂起的线程,否则就不处理,所以我们实现tryReleaseShared
方法,如果只是实现唤醒的话,直接返回true就可以,但是我们要实现门闩只能使用一次的功能,门闩被打开后,再执行await
方法不会将线程挂起,所以将state
的值设置为1,这样state
的时候就是等于1,然后就直接返回1,1大于0,就不会进入判断,也就不会将线程挂起
1 | public class OneShotLatch { |