概述
AbstractQueuedSynchronizer代码之前读过,没有写个总结,这次在看线程池源码的时候又遇到了,发现忘记了很多关键性的内容,因此,特地翻出来,再看看
组件
Node
AQS的内部类,其核心变量有:
- waitStatus
- prev
- next
- thread
- nextWaiter
acquire
先来看源码吧
1 | public final void acquire(int arg) { |
tryAcquire方法由子类实现,这里先不做过多的解释。先来看addWaiter方法。
addWaiter方法
1 | private Node addWaiter(Node mode) { |
addWaiter的逻辑如下:
- 新建一个Node实例。
- 尝试将node实例加入到node队列中去
- 使用cas方法,希望快速地把node加入到队列中去,若成功,直接返回node。
- cas失败,使用enq方法将其入队。之后返回enq方法。成功后返回node实例。
enq方法
1 | private Node enq(final Node node) { |
enq方法的主要目的就是将其加入到队列中去,那么我们来看看其主要逻辑:
- 在循环中,获取到当前的tail节点,如果t==null,说明当前的没有等待队列,那么我们先使用cas初始化一个空的node同时作为头结点和尾节点
- 否则,另当前的node的prev指针指向tail节点,并使用使用cas将当前节点设置为为节点,成功,则将之前为节点的next指针指向当前节点,并返回之前的tail节点。
- 否则,进入下一次循环继续重试。
acquireQueued方法
1 | final boolean acquireQueued(final Node node, int arg) { |
这里写一下上面这段代码的逻辑,首先,我们先要指出,一般情况下,执行到该方法时,是一个线程正在执行lock.lock()方法,那么,函数中的循环也是在该线程中运行的,这里要明白这个大前提,否则就会云里雾里。
在循环中,执行逻辑如下:
- 获取当前节点的上一个节点。
- 如果前一个结点时节点队列的头节点,并且使用tryAcquire方法获取到了锁,那么我们可以保证其他的线程得不到锁了,我们把当前节点设置未新的头节点,然后返回在我们等待锁的过程中,是否被中断过,如果中断过,返回中断未true,否则未false
- 假设在2中没有获取到锁,那么我们就要遇到两个函数shouldParkAfterFailedAcquire,parkAndCheckInterrupt,其内容马上会详细分析,总的来说,这两个方法的目的时,判断当前线程是否可以被暂停,可以暂停的话,将其暂停,并检查在暂停期间是否被interrupt过,如果时的,那么interrupted在当前线程再次被运行时,会设置为ture。
下面我们分别分析shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法
shouldParkAfterFailedAcquire方法
这里特意从Node中提取出来了几个waitStatus可能值,可以看到,
- 如果前一个节点的ws时signal的,那么直接返回true说明,当前进程可以被暂停。
- 如果ws大于0,也就是CANCELLED状态,那么我们要通过一个循环,向前找到第一个状态部位CANCELLED的node,并使其指向当前node。
- 如果ws是其他的值,那么使用cas尝试更改其前一个节点waitstatus
- 返回false。
1 | /** waitStatus value to indicate thread has cancelled */ |
parkAndCheckInterrupt方法
逻辑比较简单,当当前线程可以被暂停的时候,那么
- 使用LockSupport.park方法,将当前线程暂停
- 在暂停回来的时候,返回当前线程在暂停时,是否接收到了中断请求。
1 | private final boolean parkAndCheckInterrupt() { |
selfInterrupt方法
如果当前线程在执行过程中被中断过,那么我们在获取到锁之后,自行再次中断一次。
1 | static void selfInterrupt() { |
小结
以上为acquire方法的执行逻辑。简单总结:
- tryAcquire方法调用返回true,说明直接获取到了锁
- 否则,尝试将将当前线程加入队列并在acquireQueued方法中一直尝试获取锁
- 判断是否需要执行中断操作。
release方法
在acquire方法中,其实我们对waitStatus的意义并不是很明了的,那么我们试试在release方法中是否可以找到一些答案?
1 | public final boolean release(int arg) { |
release方法的调用逻辑还是比较简单的:
- 调用子类的tryRelease方法,一般在ReentrantLock等类中实现。
- 如果该方法实现返回为true,即当前线程已经完全释放了了锁,进入if判断,调用unparkSuccessor来唤醒等待线程。该方法下一节有详细介绍。
- tryRelease方法返回为false,说明锁未完全释放,release方法返回false。
unparkSuccessor方法
对于该方法,需要和前面的parkAndCheckInterrupt方法对应起来,我们现在要做的事情就是将之前被park的线程unpark,使其重新进入线程的调度队列中。具体逻辑
- 更新当前线程对应的node的waitStatus状态
- 找到head后的第一个未被cancele的节点
- 使用LockSupport.unpark将2中找到的node重新放入线程调度队列中
1 | private void unparkSuccessor(Node node) { |
condition相关
在使用可重入锁的时候,一直很好奇condition是怎么实现的,这其实也是在AQS中实现的,AQS中的ConditionObject实现了Condition。
组件
在ConditionObject类中,有两个node类型的组件
- firstWaiter
- lastWaiter
那么不言自明,ConditionObject内部也是有一个队列的,即由上两个组件组成的等待队列
await方法
unlinkCancelledWaiters方法
该方法的目的是,从以firstWaiter为头,lastWaiter为尾的等待队列上,从firstWaiter开始,
对waitStatus不为CONDITION状态的节点,进行剔除动作。具体逻辑
- 从头结点开始,将当前节点引用赋值给变量t,将next设置为t.nextWatier引用。
- 如果t不为CONDITION状态,那么,我们就要考虑剔除t了。具体过程
- 如果trail还未设置,那么直接领firstWaiter=next
- 否则,trail设置了的话,将t.nextWaiter引用赋值给trail.next即可。
- 如果next为null,那么说明没有后续等待节点了,直接将trail引用赋值给lastWaiter即可,整个方法就要结束了。
- 如果是的话,就把t的引用在给trail(trail是用来记录上一个有效node的引用的)。
- 将next赋值给t,循环继续,知道赋值给t的next为null的时候结束
1 | private void unlinkCancelledWaiters() { |
addConditionWaiter方法
逻辑:
- 找到lastWaiter,并向前查找到第一个未被取消的condition为止。
- 新建一个waitStatus为Node.CONDITION,thread为当前线程的节点。
- 如果此时的t为null,那么firstWaiter设置为当前节点
- 否则,将lastWaiter的nextWaiter设置为当前节点
- 将lastWaiter设置为当前node
注意:这里之所以在变量修改的时候不需要加锁,是因为我们当前在锁内部,因此天然安全。
1 | private Node addConditionWaiter() { |
fullyRelease方法
方法的目的很简单,那就是释放锁,由于同一个线程可能多次获取锁,所以AQS的state是可能比1更大的,该方法的逻辑就是彻底的释放锁,
将AQS的state变为0,将锁释放
1 | final int fullyRelease(Node node) { |
isOnSyncQueue方法
该方法检查入参node是否在队列中,逻辑如下:
- 前两个if判断很容易理解,快速确定node在或者不在一个队列中
- findNodeFromTail方法的逻辑是从AQS等待队列的尾部开始,向前查找,
如果查找到了一个节点正好等于当前节点,那么返回true,否则一直查找,
直到查找将队列遍历完毕还未找到的话,就返回false。
1 | final boolean isOnSyncQueue(Node node) { |
####### 小结
isOnSyncQueue如果一直返回false,wait方法就会在一个循环中持续执行,并且有可能被park掉。
总之,如果该方法一直返回为true,说明当前线程一直在ConditonObject的等待队列里,而未能进入到
AQS的锁队列中去,需要一直等待。
这就是condition的wait方法的核心逻辑所在
acquireQueued方法
该方法在前文中有介绍,再去复习一下即可。这里简单说一下其目的。
在当前节点重新进入到AQS的等待队列中去的时候,使用该方法重新获取到锁,并设置为之前的的重入次数。
总结
结合前面小结的内容,我们可以很好理解await方法如何实现的了,后面再来看看signal方法吧。
1 | public final void await() throws InterruptedException { |
signal方法
首先声明,在这些方法执行的时候,是不存在竞争的可能的,这是在锁内部执行的。
####@ doSignal方法
在doSignal内部,其执行逻辑是
- 首先将当前节点first的nextWaiter节点引用记录到firstWaiter记录下来。
- 判断记录的nextWaiter是否为null,是的话,说明到了队尾,需要把lastWaier设置为null了。
- 当前节点的nextWaiter设置为null(gc)。
- 如果transferForSignal方法执行返回为true,说明将当前节点加入到了AQS的等待队列。循环结束。
- 如果返回为false,即node的waitStatus在设置的时候不符合预期(见下文),我们要直接跳过该节点了。
- 将当前节点记录为之前存下来的nextWaiter节点,如果其部位null,说明还有线程在等待,那么循环继续,否则
循环结束。
我们看看transferForSignal方法的逻辑吧:
- 如果试图将当前node的waitStatus通过cas设置为0失败,那么就返回false,上述的循环继续。
- 如果设置成功,那就需要将当前节点加入到AQS的锁队列中去,以为这当前node对应线程马上就可以
有资格获取到锁了。 - 在enq入队后,会将node的前一个node返回来,如果
1 | private void doSignal(Node first) { |
小节
通过上文,我们可以很容易理解signal函数的逻辑:将等待队列中的第一个节点加入到
AQS的等待队列中去。
1 | public final void signal() { |
总结
本文大的分为两个部分:
- acquire和release。对应了lock和unlock方法。
- condition.await和condition.signal方法。
有一个遗留问题,就是waitStatus的状态转换全面梳理未进行,这个后续再来看看吧。