前两篇文章中分析了AQS的独占功能和共享功能,AQS中还实现了Condition的功能。它可以替代传统的Object中的wait()、notify()、notifyAll()方法来实现线程间的通信,使线程间协作更加安全和高效。
使用synchronized关键字时,所有没有获取锁的线程都会等待,这时相当于只有1个等待队列;而在实际应用中可能有时需要多个等待队列,比如ReadLock和WriteLock。Lock中的等待队列和Condition中的等待队列是分开的,例如在独占模式下,Lock的独占保证了在同一时刻只会有一个线程访问临界区,也就是lock()方法返回后,Condition中的等待队列保存着被阻塞的线程,也就是调用await()方法后阻塞的线程。所以使用lock比使用synchronized关键字更加灵活。
Condition必须被绑定到一个独占锁上使用,在ReentrantLock中,有一个newCondition方法,该方法调用了Sync中的newCondition方法:
1 | final ConditionObject newCondition() { |
ConditionObject是在AQS中定义的,它实现了Condition接口。该类有两个重要的变量:
1 | /** First node of condition queue. */ |
对于Condition来说,它是不与独占模式或共享模式使用相同队列的,它有自己的队列,所以这两个变量表示了队列的头结点和尾节点。
await
1 | public final void await() throws InterruptedException { |
addConditionWaiter
首先调用addConditionWaiter在Condition队列中增加一个等待节点:
1 | private Node addConditionWaiter() { |
根据当前线程创建一个Node并添加到Condition队列中。如果尾节点不是CONDITION状态(被取消),调用unlinkCancelledWaiters
方法删除Condition队列中被cancel的节点。然后将lastWaiter的nextWaiter设置为node,并将node设置为lastWaiter,即将其添加到链表的末尾。
unlinkCancelledWaiters
1 | private void unlinkCancelledWaiters() { |
unlinkCancelledWaiters
方法从头节点开始遍历。
如果节点的状态不是CONDITION
(被取消了),从链表中删除该节点:如果该节点是头节点,则将firstWaiter指向该节点的后节点;否则该节点的前节点指向该节点的后节点。如果节点的后节点为null,表示它是尾节点,lastWaiter指向它的前节点。
如果节点的状态是CONDITION
,跳过该节点。
fullyRelease
回到await
方法,调用addConditionWaiter
在队列中增加一个等待节点之后,接着调用fullyRelease
:
1 | final int fullyRelease(Node node) { |
fullyRelease
方法的功能是将当前AQS中持有的锁全部释放。savedState表示当前线程已经加锁的次数(ReentrantLock为重入锁)。因为是可重入的原因,只有在state为0的时候才会真的释放锁,所以在fullRelease方法中,需要将之前加入的锁的次数全部释放,目的是将该线程从Sync队列中移出。
isOnSyncQueue
接着循环调用isOnSyncQueue
判断当前线程是否又被添加到了Sync队列中,如果已经在Sync队列中,则退出循环。什么时候会把当前线程又加入到Sync队列中呢?当然是调用signal方法的时候,因为这里需要唤醒之前调用await方法的线程。
1 | final boolean isOnSyncQueue(Node node) { |
isOnSyncQueue
方法判断当前线程的node是否在Sync队列中,即被唤醒加入到Sync队列中:
- 如果当前线程node的状态为CONDITION,或者
node.prev
为null时说明已经在Condition队列中了,所以返回false - 如果node.next不为null,说明在Sync队列中,返回true
- 如果两个if都未返回时,可以断定node的prev一定不为null,next一定为null,这个时候可以认为node正处于放入Sync队列的CAS操作的执行过程中。而这个CAS操作有可能失败,所以通过
findNodeFromTail
再尝试一次判断。
1 | private boolean findNodeFromTail(Node node) { |
findNodeFromTail
从Sync队列尾部开始判断,因为在isOnSyncQueue
方法调用该方法时,node.prev一定不为null。但这时的node可能还没有完全添加到Sync队列中(因为node.next是null),这时可能是在自旋中。
1 | private Node enq(final Node node) { |
如果此时CAS还未成功,那只好返回false了。
checkInterruptWhileWaiting
回到await
方法,如果节点不在Sync队列中,则挂起线程。线程唤醒之后调用checkInterruptWhileWaiting
方法检查是否被中断,如果中断再调用transferAfterCancelledWait
方法判断后续的处理应该是抛出InterruptedException
还是重新中断。
1 | private int checkInterruptWhileWaiting(Node node) { |
1 | final boolean transferAfterCancelledWait(Node node) { |
该方法判断,在线程中断的时候,是否有signal方法的调用。
- 如果
compareAndSetWaitStatus(node, Node.CONDITION, 0)
执行成功,则说明中断发生时,没有signal的调用,因为signal方法会将状态设置为0 - 如果第1步执行成功,则将node添加到Sync队列中,并返回true,表示中断再signal之前
- 如果第1步失败,则检查当前线程的node是否已经在Sync队列中了,如果不在Sync队列中,则让步给其他线程执行,直到当前的node已经被signal方法添加到Sync队列中。
- 返回false
这里需要注意的地方是,如果第一次CAS失败了,则不能判断当前线程是先进行了中断还是先进行了signal方法的调用,可能是先执行了signal然后中断,也可能是先执行了中断,后执行了signal。当然,这两个操作肯定是发生在CAS之前。这时需要做的就是等待当前线程的node被添加到Sync队列后,也就是enq方法返回后,返回false告诉checkInterruptWhileWaiting
方法返回REINTERRUPT
,后续进行重新中断。
简单来说,该方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为true表示中断在signal调用之前,signal还未执行,否则表示signal已经执行过了。
回到await
函数,checkInterruptWhileWaiting
返回不为0(THROW_IE或者REINTERRUPT)说明遭遇中断,跳出循环。
接着获取独占锁,如果node还有后节点则调用unlinkCancelledWaiters
从头到尾遍历Condition队列,移除状态为非CONDITION的节点。
因为在执行该方法时已经获取了独占锁,所以不需要考虑多线程问题。
reportInterruptAfterWait
如果当前线程被中断,则在await
方法中调用reportInterruptAfterWait
方法:
1 | private void reportInterruptAfterWait(int interruptMode) |
该方法根据interruptMode来确定是应该抛出InterruptedException还是继续中断。
signal
1 | public final void signal() { |
首先判断当前线程是否占有独占锁,然后通过firstWaiter依次唤醒Condition队列中的node,并把node添加到Sync队列中。
在await方法中可以知道添加到Condition队列中的node每次都是添加到队列的尾部,在signal方法中是从头开始唤醒的,所以Condition是公平的,signal是按顺序来进行唤醒的。
1 | private void doSignal(Node first) { |
doSignal方法先将队列前面节点依次从队列中取出,然后调用transferForSignal方法去唤醒节点,这个方法有可能失败,因为等待线程可能已经到时或者被中断,因此while循环这个操作直到成功唤醒或队列为空。
1 | final boolean transferForSignal(Node node) { |
该方法首先尝试设置node的状态为0:
- 如果设置失败,说明已经被取消,没必要再进入Sync队列了,doSignal中的循环会找到下一个node再次执行
- 如果设置成功,但之后又被取消了呢?无所谓,虽然会进入到Sync队列,但在获取锁的时候会调用
shouldParkAfterFailedAcquire
方法,该方法会移除此节点。
如果执行成功,则将node加入到Sync队列中,enq会返回node的前继节点p。这里的if判断只有在p节点是取消状态或者设置p节点的状态为SIGNAL失败的时候才会执行unpark。
什么时候compareAndSetWaitStatus(p, ws, Node.SIGNAL)
会执行失败呢?如果p节点的线程在这时执行了unlock方法,就会调用unparkSuccessor
方法,unparkSuccessor
方法可能就将p的状态改为了0,那么执行就会失败。
到这里,signal方法已经完成了所有的工作,唤醒的线程已经成功加入Sync队列并已经参与锁的竞争了,返回true。
signalAll
1 | public final void signalAll() { |
判断当前线程是否占有独占锁,然后执行doSignalAll方法。
1 | private void doSignalAll(Node first) { |
与doSignal方法比较一下,doSignal方法只是唤醒了一个node并加入到Sync队列中,而doSignalAll方法方法唤醒了所有的Condition节点,并加入到Sync队列中。
Condition必须与一个独占锁绑定使用,在await或signal之前必须先持有独占锁。Condition队列时一个单向链表,它是公平的,按照先进先出的顺序从队列中被唤醒并添加到Sync队列中,这时便恢复了参与竞争锁的资格。