Java 多线程编程之五 AQS 底层源码深度剖析

大纲

AQS 的概述

AQS (AbstractQueuedSynchronizer) 的字面意思是 “抽象队列同步器”。在一般情况下,AQS 指的是 java.util.concurrent.locks 包下的 AbstractQueuedSynchronizer 类,但其实还有另外两种抽象队列同步器:AbstractOwnableSynchronizerAbstractQueuedLongSynchronizer。AQS 是用来构建锁或者其他同步器组件的重量级基础框架及整个 JUC 体系的基石,通过内置的 FIFO 队列来完成资源获取线程的排队工作(如下图所示),并通过单个原子 int 类型变量表示持有锁的状态。简而言之,AQS = state 变量 + CLH 队列。

AQS 的作用

加锁就会导致线程阻塞,有阻塞就需要排队,实现排队必然需要有某种形式的队列来管理。简而言之,抢到资源的线程直接执行业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等候(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证分配。这个机制主要用的是 CLH 队列的变体实现,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。AQS 使用一个 volatileint 类型的成员变量 state 来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,将每个要去抢占资源的线程封装成队列的结点(Node)来实现锁的分配,通过 CAS、自旋以及 LockSupport.park() 的方式维护 state 变量的状态,使并发达到同步的控制效果。AQS 的整体工作流程如下图所示:

AQS 的核心类

  • AQS 的核心类

    • java.util.concurrent.locks.AbstractOwnableSynchronizer
    • java.util.concurrent.locks.AbstractQueuedLongSynchronizer
    • java.util.concurrent.locks.AbstractQueuedSynchronizer,简称 AQS
  • 跟 AQS 相关的锁

    • Semaphore (信号量锁)
    • CountDownLatch(闭锁)
    • CyclicBarrier(循环屏障锁)
    • ReentrantLock(可重入锁)
    • ReentrantReadWriteLock(读写锁)
    • ……
  • 锁和同步器的关系

    • 锁:面向锁的使用者,定义了程序员和锁交互的调用层 API,隐藏了实现细节,直接调用即可。
    • 同步器:面向锁的实现者(设计者),比如 Java 并发大神 Douglee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知 / 唤醒机制等。

更多跟 AQS 相关的内容

  • 更多关于 AQS 与锁相关的内容请点击 这里 查看。

AQS 的源码浅析

AbstractQueuedSynchronizer 类的源码注释

AbstractQueuedSynchronizer 类的源码剖析

AQS 使用一个 volatileint 类型的成员变量 state 来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,将每个要去抢占资源的线程封装成队列的结点(Node)来实现锁的分配,通过 CAS、自旋以及 LockSupport.park() 的方式维护 state 变量的状态,使并发达到同步的控制效果。AQS 中的 Node 节点是什么呢?它类似 HashMap 的 Node 节点,JDK 用 static class Node<K,V> implements Map.Entry<K,V> {} 来封装传入的键值对。这里也是一样的道理,JDK 使用 Node 来封装(管理)Thread,可以将 Node 和 Thread 类比为候客区的椅子和等待办理业务的顾客。

AQS 的内部体系架构图

AQS 的 CLH 队列

CLH 队列是一种自旋锁队列,由 John Mellor-Crummey、Michael L. Scott 和 Thomas E. Anderson 三人在 1991 年提出。CLH 队列使用了一种无锁队列算法,被设计用来在多线程环境下管理共享资源的访问。与传统的自旋锁相比,CLH 队列提供了更好的性能和可伸缩性。它的实现原理是使用单向链表结构管理等待访问共享资源的线程,这些线程按照先来后到的顺序排队,等待释放锁。AQS 中的 CLH 队列是原 CLH 队列的一个变体,也就是 CLH 变体的虚拟双向队列 FIFO。简单来说,AQS 中的 CLH 队列由以下几个要素组成:

  • 节点(Node):每个等待锁的线程都会创建一个节点,用于表示其在队列中的位置。节点通常包含了线程状态等信息,以及指向前一个节点的引用。
  • 虚拟前驱(Dummy Head):CLH 队列会使用一个虚拟的头节点来简化代码逻辑,使得每个节点都有一个前驱节点,避免了在队列为空时的特殊处理。
  • 自旋(Spin):等待锁的线程会通过自旋的方式尝试获取锁,而不是进入睡眠状态,这样可以减少线程切换的开销。
  • CAS 操作:CLH 队列通常使用 CAS(Compare and Swap)操作来实现节点的状态更新,以确保并发安全性。

AQS 的源码深度剖析

从 ReentrantLock 剖析 AQS

Lock 接口的实现类,基本都是通过聚合了一个抽象队列同步器的子类来完成线程访问控制的。比如,ReentrantLock 实现了 Lock 接口,在 ReentrantLock 的内部聚合了一个 AbstractQueuedSynchronizer 的子类 Sync。

ReentrantLock 在内部定义了多个静态内部类,其中有 NoFairSync(非公平锁)和 FairSync(公平锁)。

ReentrantLock 的构造方法不传参数时,表示默认创建非公平锁;当参数为 true,表示创建公平锁;当参数为 false 表示创建非公平锁。

在 ReentrantLock 内部,NoFairSync 和 FairSync 中 tryAcquire() 方法的区别如下,可以明显看出两者的唯一区别就在于公平锁在获取同步状态时多了一个限制条件 !hasQueuedPredecessors()

ReentrantLock 的 hasQueuedPredecessors() 方法,用于在公平锁获取锁时,判断等待队列中是否存在有效节点,也就是判断是否需要排队。

由于公平锁在获取锁时比非公平锁多了一个判断 !hasQueuedPredecessors(),即判断是否需要排队,这导致公平锁和非公平锁的差异如下。

  • 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有其他线程在等待,那么当前线程就会进入等待队列中。
  • 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻抢占锁对象。也就是说,队列的第一个排队线程在 unpark(),之后还是需要竞争锁 (存在线程竞争的情况下)。

在创建完公平锁或者非公平锁后,执行 lock() 方法会进行加锁,最终都会执行到 acquire() 方法。

最后 acquire() 方法会执行 tryAcquire() 方法,而在 NonfairSync 和 FairSync 中均重写了其父 AbstractQueuedSynchronizer 中的 tryAcquire() 方法。

深度剖析 AQS 的底层源码

这里举个例子,假设 A、B、C 三个人都要去银行窗口办理业务,但是银行窗口只有一个,下面使用 ReentrantLock.lock() 方法模拟这种业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 通过一个银行办理业务的案例来模拟 AQS 是如何进行线程管理和实现通知唤醒机制的
*/
public class AQSDemo {

/**
* 3 个线程模拟 3 个顾客来银行网点的受理窗口办理业务
*/
public static void main(String[] args) {

ReentrantLock lock = new ReentrantLock();

// A 顾客就是第一个顾客,此时受理窗口没有任何人,A 可以直接去办理业务
new Thread(() -> {
ReentrantLock.lock();
try {
System.out.println("-----A thread come in");
try {
TimeUnit.MINUTES.sleep(20);
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}, "A").start();

// 第二个顾客,第二个线程 ---> 由于受理业务的窗口只有一个 (只能有一个线程持有锁),此时 B 只能进入候客区等待
new Thread(() -> {
ReentrantLock.lock();
try {
System.out.println("-----B thread come in");
} finally {
lock.unlock();
}
}, "B").start();

// 第三个顾客,第三个线程 ---> 由于受理业务的窗口只有一个 (只能有一个线程持有锁),此时 C 只能进入候客区等待
new Thread(() -> {
ReentrantLock.lock();
try {
System.out.println("-----C thread come in");
} finally {
lock.unlock();
}
}, "C").start();
}

}

lock()

在执行 ReentrantLock 的构造方法时,没有传递参数,因此默认创建的是非公平锁,而且执行 ReentrantLock.lock() 方法最终都会执行 NonfairSync 重写后的 lock() 方法。第一次执行 lock() 方法时,state 变量的值等于 0,表示锁没有被占用。

然后 lock() 方法会执行 compareAndSetState(0, 1) 方法进行 CAS 判断,可得 state == expected == 0,因此 CAS 操作执行成功,将 state 变量的值修改为 1。

整个 CAS 修改过程会通过 Unsafe 提供的 compareAndSwapInt() 方法来保证修改操作的原子性。如果变量的值等于期望值,则修改变量的值为新值,并返回 true;若不相等,则直接返回 false

再来看看 setExclusiveOwnerThread() 方法,它负责将拥有锁的线程设置为当前线程(线程 A)。

第二次执行 lock() 方法时,state 变量的值等于 1,表示锁已经被占用,此时执行 compareAndSetState(0, 1) 方法进行 CAS 判断,可得 state != expected,因此 CAS 操作执行失败,进入 acquire() 方法。

acquire()

acquire() 方法中,主要包含以下几个方法,下面会详细介绍每一个方法

tryAcquire()

tryAcquire() 方法中,为什么只抛了一个异常呢?这是 AbstractQueuedSynchronizer 抽象队列同步器中定义的方法,既然抛出了异常,就说明父类强制要求子类实现该方法。

找到 tryAcquire() 方法在 AbstractQueuedSynchronizer 子类中的实现。

这里以子类 NonfairSync 非公平锁为例,在 tryAcquire() 方法中执行了 nonfairTryAcquire() 方法,传入的参数是 1。

nonfairTryAcquire() 方法中,大多数情况都是这样的执行流程:线程 B 执行 int c = getState() 时,获取到 state 变量的值为 1,表示锁正在被占用,于是执行 c == 0 发现条件不成立;接着执行下一个判断条件 current == getExclusiveOwnerThread(),current 线程为线程 B,而 getExclusiveOwnerThread() 方法返回的是正在占用锁的线程 A,因此 tryAcquire() 方法最后会返回 false,表示并没有抢占到锁。

上面的 getExclusiveOwnerThread() 方法,返回的是正在占用锁的线程(Exclusive - 排他锁)。

nonfairTryAcquire() 方法有两种比较特殊的执行流程。第一种情况是,执行 int c = getState() 语句时,此时线程 A 恰好执行完成,释放出了锁,那么 state 变量的值为 0,当然发生这种情况的概率很小。那么线程 B 执行 CAS 操作成功后,将占用锁的线程修改为自己,然后返回 true,表示抢占锁成功。其实这里还有一种情况,需要留到 unlock() 方法执行时才能解释清楚。第二种情况为可重入锁的表现,假设 A 线程又再次抢占到锁(当然上述案例代码里面并没有体现出来),这时 current == getExclusiveOwnerThread() 条件成立,将 state 变量的值加上 acquire,这种情况也应该返回 true,表示线程 A 正在占用锁。因此,state 变量的值是可以大于 1 的。

tryAcquire() 方法返回 false 之后,执行 ! 操作后为 true,那么就会继续执行 addWaiter() 方法。

addWaiter()

这里的 addWaiter() 方法会先判断 tail 尾指针是否为空,如果为空,则执行 enq(node) 方法,将封装了线程 B 的 Node 节点加入到队列中。

上面的 end(node) 方法,用于构建双端同步队列。在双端同步队列中,第一个节点为虚节点(也叫哨兵节点),它其实并不存储任何信息,只是占个位置。真正的第一个有数据的节点,是从第二个节点开始的。

end(node) 方法中,第一次执行 for 循环:当线程 B 进来时,双端同步队列为空。由于 tail == null,先通过 new Node() 创建一个哨兵节点,然后将头指针指向哨兵节点。此时队列中只有一个节点,尾节点即是头节点,因此尾指针也指向该哨兵节点。

end(node) 方法中,第二次执行 for 循环:将封装着线程 B 的节点放入双端同步队列中,此时 tail 尾指针指向了哨兵节点并不等于 null,以尾插法的方式,先将 node(装着线程 B 的节点)的 prev 指向之前的 tail,再通过 compareAndSetTail(t, node) 方法将 node 设置为尾节点,最后将 t.next 指向 node,最后执行 return t 结束 for 循环。注意,最后哨兵节点和 NodeB 节点的 waitStatus 都为 0,表示在等待队列中。

最后看看线程 C(顾客 C)的执行流程,线程 C 和线程 B 的执行流程很类似,都是执行 acquire() 中的方法。

但是在 addWaiter() 方法中,线程 C 的执行流程有些区别。由于 tail != null,因此在 addWaiter() 方法中就已经将 C 节点添加至队尾了。

在执行 addWaiter() 方法时,会将节点 C 添加到双端同步队列的队尾,而且不需要再执行 enq(node) 方法。

acquireQueued()

当线程 B 执行完 addWaiter() 方法后,就会进入 acquireQueued() 方法中,此时传入的参数为封装了线程 B 的 NodeB 节点。

acquireQueued() 方法中,两个 if 判断中的代码都是放在 for( ; ; ) 中执行,这样可以实现自旋的操作。NodeB 节点的前驱结点为哨兵节点,因此 final Node p = node.predecessor() 执行完后,p 将指向哨兵节点。哨兵节点满足 p == head 条件,如果此时线程 B 执行 tryAcquire(arg) 方法尝试抢占锁还是失败了,那么就会执行后面 if 判断中的 shouldParkAfterFailedAcquire() 方法。

shouldParkAfterFailedAcquire() 方法中,由于哨兵节点的 waitStatus 为 0,所以会通过 compareAndSetWaitStatus() 方法执行 CAS 操作,将哨兵节点的 waitStatus 修改为 Node.SIGNAL,即使将 waitStatus 修改为 -1。

注意,compareAndSetWaitStatus() 方法会调用 UnSafe 类的 compareAndSwapInt() 来保证修改操作的原子性,虽然 compareAndSwapInt() 方法内无自旋操作,但是在 acquireQueued() 方法中的 for( ; ; ) 能保证实现自旋的操作。

执行完上述操作后,哨兵节点的 waitStatus 会被修改为 -1。

执行完上述操作后,将退出 if 判断,又会重新进入 for( ; ; ) 循环,此时第二次执行 shouldParkAfterFailedAcquire() 方法。

由于哨兵节点的 waitStatus 已经被修改为 -1,因此在线程 B 第二次执行 shouldParkAfterFailedAcquire() 方法时会返回 true,导致会接着执行 parkAndCheckInterrupt() 方法;然后线程 B 调用 park() 方法后被阻塞,也就是不会继续往下执行,一直排队等待。

线程 C 的执行流程:因为线程 C 最终也会执行 LockSupport.park() 方法,所以线程 C 最后也会被阻塞,进入等候区排队等待。

总结

如果前驱节点的 waitstatus 是 SIGNAL (-1) 状态,那么 shouldParkAfterFailedAcquire() 方法会返回 true,当前线程会继续往下执行 parkAndCheckInterrupt() 方法,然后再执行 park() 方法将当前线程挂起。根据 park() 方法 API 描述,当前线程在两种情况下会恢复执行,第一种情况是被 unpark,第二种情况是被中断(interrupt)。如果是发生第二种情况(被中断),那么 parkAndCheckInterrupt() 方法会返回当前线程的中断状态。

unlock()

线程 A 调用 ReentrantLock.unlock() 方法,释放手上持有的锁。

ReentrantLock.unlock() 方法中,调用了 sync.release() 方法。

sync.release() 方法中,由于线程 A 即将释放锁,因此执行 tryRelease() 方法后会返回 true,表示锁释放成功。head 指针指向哨兵节点,并且满足 if 条件,可以继续执行 unparkSuccessor() 方法。

tryRelease() 方法中,又是抛出了一个异常,继续查看 AbstractQueuedSynchronizer 子类的 tryRelease() 方法实现。

由于线程 A 只加过一次锁,因此 state 的值为 1,传进来的参数 release 的值也为 1;满足 c == 0 条件,将 free 标志位设置为 true,表示当前锁已被释放;再将排他锁关联的线程设置为 null,表示当前没有任何线程占用锁。

由于 release() 方法中获取到的头结点 h 为哨兵节点,而且哨兵节点的 waitStatus 为 -1,因此在 unparkSuccessor() 方法中可以通过 compareAndSetWaitStatus() 方法执行 CAS 操作,将哨兵节点的 waitStatus 设置为 0,并将哨兵节点的下一个节点 NodeB 获取出来,然后唤醒 NodeB 节点中封装的线程 B。

执行完上述操作后,当前没有线程占用锁,哨兵节点的 waitStatus 被设置为 0,而且 state 的值也为 0(表示当前没有任何线程占用锁)。


杀个回马枪,继续看看线程 B 被唤醒之后的执行流程。首先返回到 lock() 方法的执行流程中来,线程 B 被 unpark() 之后将不再阻塞,继续往下执行。由于 线程 B 是被正常唤醒的,因此 Thread.interrupted() 方法的返回值为 false,表示线程 B 未被中断。

然后返回到 parkAndCheckInterrupt() 方法的上一层 acquireQueued() 方法中来,此时锁未被占用,线程 B 再次执行 tryAcquire() 方法能够抢占到锁,并且将 state 变量的值设置为 1,表示该锁已经被占用。

接着继续看看 acquireQueued() 方法中的 setHead(node) 方法。传入的节点为 NodeB,将头指针指向 NodeB 节点;将 NodeB 节点中封装的线程置为 null(因为线程 B 已经获取到锁);NodeB 不再指向其前驱节点(哨兵节点)。这一切操作都是为了将 NodeB 节点作为新的哨兵节点。

执行完 setHead() 方法后,整体状态如下图所示

最后将原来的哨兵节点的后驱节点设置为 null,之后原来的哨兵节点就会变成一个完全孤立的节点(会被 GC 回收),此时 NodeB 节点可以正式作为新的哨兵节点了。

执行完上述步骤后,整体状态如下图所示

线程 C 被唤醒之后,执行的也是类似的唤醒流程。

AQS 高频面试题

高频面试题一

我相信你应该看过源码了,那么 AQS 里面有个变量叫 state,它的值有几种状态?

  • state 变量有 3 种状态
    • 0:表示锁没被占用
    • 1:表示锁被占用了
    • 大于 1:表示是可重入锁,比如 ReentrantLock 才有这状态

高频面试题二

如果锁正在被占用,A、B 两个线程进来了以后,请问队列中总共有多少个 Node 节点?

3 个节点,分别是哨兵节点、NodeA、NodeB。

参考资料

AQS 源码解读流程图