06 锁:如何理解锁的同步阻塞队列与条件队列?
你好,我是丁威。
这节课,我们重点介绍并发编程中的基石:锁。
锁的基本存储结构
我们先通过一个简单的场景来感受一下锁的使用场景。一家三口在一起生活,家里只有一个卫生间,大家早上起床之后都要去厕所。这时候,一个人在卫生间,其他人就必须排队等待。
这个场景用IT术语可以表述为下面两点。
- 洗手间作为一个资源在同一时间只能被一个人使用,它具备排他性。
- 一个人用完洗手间(资源)之后会归还锁,然后排队者重新开始竞争洗手间的使用权。
我们可以对这个场景进行建模。
- 资源:更准确地说是公共资源或共享资源需要被不同的操作者使用,但它不能同时被使用。
- 资源使用者:共享资源的使用者。
- 锁:用来保护资源的访问权。锁对象的归属权为共享资源,但当资源使用者向资源申请操作时,可以将锁授予资源使用者。这时候,资源使用者叫做锁的占有者,在此期间它有权操作资源。操作者不再需要操作资源之后,主动将锁归还。
- 排队队列:我们可以更专业地称之为阻塞队列,它可以存储需要访问资源但还没获取锁的资源使用者,其归属权通常为锁对象。
这里我之所以强调归属权,主要是因为它可以帮助我们理解锁的基本结构和资源的关系。
那锁的结构是什么呢?我们通过上节课的课后题来理解这个问题。
上节课的第二道课后题是问你怎么用多线程实现面包厂的生产和销售。我在这里也给你写了一段示例代码:
面包仓库的职能是为面包厂存储面包,它需要提供两个基本的方法:存储面包和获取面包。面包仓库内部使用ArrayList来存储面包,但是因为ArrayList是一个线程不安全的存储容器,它不允许多个使用者同时存储数据,所以我们需要对资源进行保护。体现在代码上,我们可以通过 synchronized(资源对象)来创建一把锁,保护多线程对资源对象的串行访问。
我们结合put方法的流程来看一下锁的基本存储结构。
假设 t1、t2两个生产者(线程)同时调用Bakery的put方法。那么synchronized(breads)在编译的时候,就会在资源breads对象上创建锁相关的结构,即锁对象。
t1,t2在执行synchronized(breads)时,只有一个线程可以获取锁,另外一个线程需要等待,所以这里需要引入一个存储结构(通常为队列)来存储这些排队的线程,我们通常会使用阻塞队列。
首先获取到锁的线程t1在向仓库中存放面包之前需要先进行判断,如果存储空间足够,执行上图中的代码step2。但是如果仓库没有足够的空间存储面包,就要执行代码step3,调用锁对象的wait方法,让获得锁的线程t1阻塞,并且释放锁。
但是,被阻塞的t1和t2还是有所不同。因为t1被阻塞的原因是条件不满足,当面包仓库有额外的存储空间时,t1就会被唤醒。所以我们还要引入一个条件队列,用来存放因条件不满足而被阻塞的线程。
t1线程如果因为条件不满足而存储在条件等待队列,当存在剩余空间后,就能被其他线程唤醒继续执行后续的代码了。在这里是将面包存储到ArrayList,那此时面包工厂中存储了面包,需要通知那些因为仓库中没有面包而阻塞的线程,调用锁的notifyAll方法唤醒在等待的线程。
线程t1因为存储空间不足在step3被阻塞,进入到条件等待队列。等到面包被卖出,仓库有足够的容量之后,t1线程将被唤醒。
这里我想给你提个问题,t1线程可以立马继续执行step3之后的代码step4吗?
答案是不能,它需要先去尝试竞争锁,成功获得锁之后才能开始执行step4,否则就会进入到阻塞队列。
从上面这个过程中,我们可以归纳出锁的基本存储结构,它包括锁的持有者线程、锁的重入次数、阻塞队列和条件等待队列四个部分。
锁的底层实现机制-AQS实现原理剖析
在Java中使用锁通常有两种编程方式。一种是使用JVM虚拟机(Java规范)层面提供的synchronized关键字;另一种是使用JUC类库,也就是大名鼎鼎的AbstractQueuedSynchronizer,简称 AQS。
其中,synchronized是在JVM虚拟机层面实现的,涉及很多底层知识,直接研读源码难度太大。相比较而言,JUC并发编程遵从JSR-166规范,提供了锁的另外一种实现方式,也就是大家所熟知的AQS类库,更加常用和易学。
接下来,我会基于JUC框架,带你从代码层面近距离观摩锁的实现原理,掌握锁的本质。
在JUC框架中,ReentrantLock对标synchronized,它实现了可重入互斥锁的全部语义。语义主要包括两个方面:一个是lock(申请锁)和unlock(释放锁);另一个是条件等待,对标Object的wait/notify。
我们先来看下ReentrantLock和AQS的类图:
简单介绍一下类图中各个类的含义。
- AbstractQueuedSynchronizer
它是AQS体系的核心基类,使用的是类模版设计模式。这个类实现了锁的基本存储结构,定义了锁的基本行为。AQS的内部数据结构为链表,持有链表的头尾节点,每一个节点用Node表示,可以实现阻塞队列和条件等待队列。其中,Node prev、next用于构建阻塞队列,而Node nextWatier用于构建条件等待队列。
AQS方法的修饰符也很有规律,其中,使用protected修饰的方法为抽象方法,通常需要子类去实现,从而实现不同特性的锁(例如互斥、共享锁、读写锁等);而用public修饰的方法基本可以认为是模板方法,不建议子类直接覆盖。
AQS还额外提供了很多有用的方法,我给你列了个表格,方便你在有需要的时候随时查看。
- AbstractOwnableSynchronizer
它是AQS核心基类的父类,用于记录锁当前的持有者线程。
- ReentrantLock
可重入互斥锁的具体实现。由于Java不支持多继承,所以由ReentrantLock继承抽象类Lock,用内部类的方式继承AQS。所以说,ReentrantLock在具体实现锁时基本都是委托内部类Sync,而Sync又继承自AQS。Sync内部有两个子类,分别是FairSync(公平锁)与NonfairSync(非公平锁)。
锁的申请
接下来我们结合ReentrantLock的部分关键源码来看看怎么实现锁的申请与释放。先看锁的申请。
ReentrantLock支持带超时时间的锁申请,具体实现方法是tryLock,时序图如下:
AQS的tryAcquireNanos代码如下图所示,该方法是在AQS中定义的。
解读一下核心要点。
如果线程的中断位标记为true,表示应用方主动放弃锁的申请,可以直接抛出中断异常,结束锁的申请。
否则,调用Sync的tryAcquire尝试获取锁。如果返回true,表示成功获取锁,可以直接返回;不然就调用doAcquireNanos,进入锁等待队列。
Sync的tryAcquire方法,代码如下:
尝试实现锁有几个要点。
首先我们要确保获取当前申请锁的线程。
我们还要获取锁的当前状态,也就是state值(state字段的含义是当前锁的重入次数,如果state为0,表示当前锁并没有被占用)。这又包括两种情况。
情况一:如果state的值为0,表示当前锁并没有被占用。根据申请锁的公平与否,会有不同的处理逻辑。
- 如果是公平锁,那么我们需要判断阻塞队列中有没有其他线程在排队。如果有,公平锁此时无法竞争锁,返回false,尝试获取锁失败。这个线程最终会调用doAcquireNanos,进入到同步阻塞队列。
- 但是如果是非公平锁,则会首先和阻塞队列中的线程竞争,如果竞争成功,可以直接获取锁,如果竞争失败,则同样进入到阻塞队列。
竞争锁的代码使用的是CAS机制,尝试更新state的值为acquires,如果更新成功,则占有锁。成功占有锁之后,需要设置锁的拥有者为当前线程。
情况二:如果state的值不为0,表示锁已经被占用。我们需要判断当前线程是不是锁的持有者。如果是,则只需要更新state的值(ReentrantLock支持可重入);否则就进入阻塞队列,排队获取锁。
为什么在竞争锁时需要使用CAS呢?什么是CAS呢?
我们知道,申请锁时要先查询state的值,然后更新state的值。但这两步在多线程环境中并不是一个安全的操作。如下图所示:
这很容易导致t1,t2都获取到了锁。根本原因是这个步骤包括两个CPU指令,无法做到原子更新。
为了解决这个问题,操作系统提供了一个新的CPU指令(CAS),用它来实现“比较-和-更新”。具体的原理是在更新一个值之前,首先比较这个值是否发生了变化,如果确实发生了变化,那么就会更新失败,否则更新成功。
如果没有成功获取锁,当前申请锁的线程还会继续调用AQS的doAcquireNanos:
这是AQS机制中非常重要的一个方法,它的实现比较复杂。我们先来看一张流程图:
我们可以把这个流程归结为五步。
第一步:判断获取锁是不是已经超时,如果是,返回false(ReentrantLock支持锁获取超时)。
第二步:调用addWaiter方法把当前节点加入到阻塞队列中。
第三步:获取节点的前驱节点。
第四步:如果节点的前驱节点为头节点,再次调用tryAcquire方法尝试获取锁。如果成功获取锁,则将当前节点设置为Head,表示当前它是锁的持有者。
第五步:如果当前节点不是头节点,或者没有成功获取锁,调用shouldParkAfterFailedAcquire判断当前线程是否需要阻塞,如果需要阻塞,则调用LockSupport.parkNanos阻塞线程。
接下来,我们对上面流程中的关键代码进行详细的解读。
先看第二步里addWaiter的具体实现:
因为AQS内部不管是阻塞队列还是条件等待队列都是基于链表实现的,所以入队列的实现比较容易理解,这里主要关注三点。
- 需要创建一个Node节点,将线程对象存储在Node节点中,方便后续对线程进行阻塞或唤醒。
- 链表在多线程环境中操作并不安全,所以在更新链表相关指针时要引入CAS机制。首先将 if和CAS组合进行一次测试,如果更新成功,直接结束操作;不然就要使用 for和CAS的组合进行多次重试,一直到更新成功为止。这背后的原理是,多线程在更新Head或者Tail时,只有一个能更新成功,如果更新失败,则重新获取Head或者Tail再进行更新,直到节点安全地加入链表为止。
- 在入队的过程中,如果队列为空时,会创建一个空的Node节点,但是不持有任何线程信息。
等到节点成功加入到阻塞队列后,需要判断节点的前驱节点是否为头节点,如果是,表示成功获取锁。成功获得锁的线程对应的节点将成为头节点,设置头节点的代码如下:
头节点持有的线程对象为什么为空呢?
这是因为锁的持有者被记录在了AbstractOwnableSynchronizer的Thread exclusiveOwnerThread属性中。这样做的好处是,我们可以认为头节点是锁的持有者,但头节点却并不维护线程对象。在实现非公平锁时,如果锁被新线程抢占,不需要更新头节点。
相反,如果节点的前驱节点不是头节点,则需要判断申请锁的线程是否需要阻塞。我们可以通过shouldParkAfterFailedAcquire方法来实现它:
解读一下,如果前驱节点的状态是Node.SIGNAL,则当前线程直接进入到阻塞队列,排队获取锁。
这里再对Node.SIGNAL补充说明一下:Node.SIGNAL的含义是节点需要一个信号来唤醒自己,如果前驱节点的状态为Node.SIGNAL,说明前驱节点在等待被唤醒,那作为前驱节点的后继节点,自然而然也需要等待被唤醒。
如果前驱节点的状态大于0,需要删除当前节点之前连续的节点。因为当前节点的状态只有Node.CANCELLED大于0,所以如果前驱节点的状态大于0说明是已取消的节点,需要被删除。示例图如下:
这里以当前节点为基准(状态为-1)向前删除。注意,只删除连续的1,也就是说遇到非取消节点立即停止删除。基于分段思想,我们不会删除前面所有的已取消节点,因为删除节点的方向是从后向前的,而且shouldParkAfterFailedAcquire这个方法会在多个线程获取锁之后被多个线程调用,但后续的节点在执行删除时,遇到当前线程,会被切割成段,段与段之间并不会有多线程执行,从而可以安全地操作各自的段。
如果前驱节点的状态为0或Propagate,需要尝试把前驱节点的状态变更为Node.SIGNAL。也就是说,不阻塞线程,而是再次试图获取锁相关的逻辑。
如果需要阻塞线程,先判断本次获取锁的剩余时间是否大于等于spinForTimeoutThreshold,如果是,则通过自旋方式进行循环,否则将使线程阻塞。其中spinForTimeoutThreshold默认为1s,这样做的目的主要是如果本次锁申请距超时还剩不到1s,就没有必要再阻塞线程了,避免线程切换带来的额外开销。
如果需要阻塞线程,我们可以调用LockSupport.parkNanos方法使线程阻塞,这个方法同样支持设置超时时间。
锁的释放
申请完锁之后,我们还会面临锁的释放。我们可以通过ReentrantLock的unlock方法释放锁,并最终调用AQS的模版方法:release方法,代码如图所示:
在详细地介绍具体的方法之前,我们先来看一张整体的时序图,理解一下释放锁的实现机制。
锁的释放主要包含如下几个步骤:
第一步:释放锁,必须先判断当前线程是否是锁的持有者,如果不是,抛出IllegalMonitorStateException异常。
第二步:判断锁的剩余占有次数,如果为0,表示锁已释放,需要唤醒阻塞队列中的其他排队线程。
我们看一下释放锁的关键代码。具体定义在ReentrantLock$Sync的tryRelease中:
这段代码有两个要点。
- 如果当前锁的占有者不是申请释放锁的线程,那就不能释放锁,只有持有者线程才能释放锁。这个时候需要抛出监视器错误。
- 如果一个锁被同一个线程重入n次,那对应也要释放n次。当持有次数为0时,表示可以释放锁。
尝试释放锁后,返回“成功”,接下来要做的是唤醒阻塞队列中的下一个线程。当然,如果你使用的是非公平锁,新来的线程在这个时候是可以直接获取锁的,这样唤醒的线程如果没能获取锁,就又会进入到阻塞队列中。
从阻塞队列中查找下一个待唤醒的线程需要使用AQS的unparkSuccessor方法,代码如下图所示:
这个过程主要包括四个要点,分别对应上图的step1、step2、step3和step4。
step1:因为这个方法的参数是头节点,头节点是当前锁的持有者,所以在释放锁时,我们要找头节点的下一个未取消的节点。
step2:确认头节点的状态。如果头节点的状态不为0,则更新为0。
step3:从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。这里说明一下,因为我们在维护节点的前驱节点时使用了CAS,通过前驱节点遍历是可靠的,不会遗漏节点。
step4:找到对应的节点,调用LockSupport.unpark方法唤醒线程。线程被唤醒后会继续去竞争锁。这里唤醒的是申请锁时用LockSupport.park阻塞的线程,因为这样可以让锁的申请和释放形成闭环通道。
锁的条件等待队列
理解了锁的申请和释放,接下来我们再来看看ReentrantLock是怎么实现Object.wait和Object.notify语义的,这是线程之间协作的基石。
线程调用锁对象的wait方法时会进入到条件等待队列,而线程调用锁对象的notify方法,会唤醒条件队列中的一个线程,具有下面三个特征。
- Object 的 wait与notify必须在临界区中调用。
- Object 的wait和notify的使用场景为条件等待。例如,一个线程获取锁后,需要等待某一个条件满足后才能继续执行。这时,为了节省CPU资源,线程可以调用锁的wait方法使自己阻塞,等待条件满足后被别的线程唤醒。
- Object的wait方法会释放当前锁。
在AQS中,实现Object的notify和wait功能的主要类为Condition,类图如下:
Condition的接口对标Object的wait与notify方法,底层的存储结构为一个链表(条件阻塞队列)。链表中的节点为Node,条件阻塞队列为单链表,链表通过Node nextWaiter指针维护链表。
因为前面在介绍lock语义的时候我们用的是带超时时间的方法,所以为了覆盖更多的AQS方法,这里我们就变一变,用不带超时时间的方法来解读await语言。不过这两者在本质上并没有差别。
为了帮助你更快掌控await的整体实现思路,可以先看一下时序图:
Condition的wait()方法对标Object.wait(),我们来看一下它的具体实现逻辑:
我们结合Object.wait的语义来体会一下await方法中最关键的六个步骤。
step1:如果当前线程被中断,要直接抛出中断异常。
step2:将节点加入条件等待队列中。
step3:释放锁,并保存释放之前锁的状态,等到条件满足线程被唤醒后,需要重新申请指定数量的锁。
step4:如果节点存在于条件队列而不在阻塞队列中,说明未收到signal信号,线程会被阻塞;如果线程被中断,就结束条件队列的等待。
step5:再次尝试申请锁,并检查唤醒的原因,看看是因为收到signal信号而被唤醒,还是因为收到了中断信号。
step6:如果先收到signal信号,再收到中断信号,那就要重新设置线程中断位,等待下一次中断检查点;如果是先收到中断信号,后收到signal信号,就直接抛出中断异常;如果正常收到signal信号,await方法结束阻塞,则继续执行后续逻辑。
其中,第二步中的加入条件队列,具体的代码实现是将节点接入到链表的尾部,如果有取消的节点就把它删除。这里线程是安全的,因为执行await方法的前提条件是要获取锁。
第四步是用await方法阻塞和唤醒线程的关键。核心代码如下图所示:
我们来看一下怎么判断线程是否在同步队列中(用isOnSyncQueue方法实现)。
- 如果节点的状态为Node.CONDITION,或者node.prev为空,表示线程在等待条件被触发。为什么节点的前驱节点不为空就可以认为线程在同步阻塞队列中呢?这是因为进入同步队列时是用CAS机制来更新前驱节点的。
- 如果Node的next指针不为空,表示线程在同步阻塞队列中,返回true。
- 如果不满足上述条件,则从尾部节点再查找一遍,如果能找到,返回true,否则返回false。
因为节点如果在条件等待队列中,说明条件不满足,线程需要阻塞并等待条件触发。线程可以通过下面几种方式被唤醒:
- 由于正常收到signal信号被唤醒;
- 先收到signal信号,然后收到中断信号;
- 先收到中断信号,再收到signal信号。
那怎么判断唤醒方式呢?
我们可以通过checkInterruptWhileWaiting来实现它。也就是检测线程的中断标志位,如果线程并没有设置中断位,则返回0。如果检测到了中断位,则用transferAfterCancelledWait方法来判断中断信号和signal的先后顺序。
transferAfterCancelledWait的核心实现逻辑是,如果成功将节点的状态从Node.CONDITION更新为0,就表示先收到了中断标记,否则就是先收到了signal信号。因为如果是先收到signal信号,节点的状态应该是NODE.SIGNAL,而且节点会进入同步阻塞队列。这样做可以有效避免signal信号丢失。
线程被唤醒后需要重新申请锁,调用acquireQueued方法来实现,这一步和前面我们提到的申请流程类似,这里就不再重复了。
当条件满足后,线程被唤醒,这时候我们就要用到Condition的signal()方法了。signal方法的时序图如下:
这部分就是从条件队列中找到第一个没有取消的节点,然后唤醒它。实现transferForSignal方法的具体代码如下:
这个方法有三个要点。
第一,要使用CAS尝试将节点状态从CONDITION转化为0。如果更新失败,说明节点已取消,需要返回false,继续通知下一个等待线程。
第二,将线程从条件阻塞队列放入到同步阻塞队列,这一步非常关键,可以防止signal信号丢失。
第三,如果线程加入同步队列后,其前置节点的状态为已取消,或者将其设置为signal失败,则直接唤醒线程。
signal方法内部的实现机制就是确保线程要么在同步队列中,要么在条件等待队列中。这样可以有效防止通知信号丢失,避免线程一直被阻塞。
到这里,Condition的await和signal方法就都介绍完了。
总结
这节课,我们首先通过一个简单的生活场景,并结合生产者-消费者模型引出了锁的基本结构,它包括:锁要保护的资源、锁的拥有者、同步阻塞队列和条件等待队列。
紧接着,我们以源码分析为主要手段,辅助流程图、时序图,一步一步地实现了锁的申请和释放。
同步阻塞队列存放的都是竞争锁失败的线程,主要表征的是线程之间的竞争、互斥,而条件等待队列中存储的是因为某一个条件不满足而需要阻塞的线程,通常需要被其他线程主动唤醒,主要表征的是线程协作。
我们可以使用LockSupport.parkNanos来阻塞线程,并通过LockSupport.unpark方法来唤醒线程。
如果你对中间件感兴趣,对锁的语义的理解必不可少。它虽然有一定难度,但是只要攻下了源码,读懂AQS,对锁的理解与认知能力会有一个质的提升,对多线程协作开发大有裨益。
JUC的体系非常庞大,这节课不能全面覆盖,但是只要掌握了AQS,后面再去学习CountDownLatch、信号量、CAS等知识会变得非常容易。如果你有兴趣,也可以读一读我写过的和锁相关的文章:《锁的优化思路》和《disruptor无锁化设计实践》,应该可以给你更多的启发。
课后题
最后,还是给你留一道课后题。
请你尝试写一篇文章,分析JUC读写锁的源码,重点剖析读锁的申请与释放还有写锁的申请与释放。我也给你提供一篇文章供你参考:《Java并发锁ReentrantReadWriteLock读写锁源码分析》。
关于这节课,如果你还有不理解的问题,也欢迎你在留言区留言。我们下节课再见!
- 末日,成欢 👍(8) 💬(1)
对于AQS框架的理解, 我的感觉它更像是synchronized的JAVA实现。 在进入synchronized代码内部的时候, 是通过monitorenter指令, 修改了对象头的Markword. AQS在进入的时候, 通过CAS修改了state变量。 释放的时候同理。 当多个线程阻塞排队的时候, synchonized同样存在入口的等待队列EntryList,和AQS类似。 当使用wait/notifyAll的时候, synchronized同样存在条件队列waiSet, 也和AQS的条件队列类似。 持有线程ownerThread,也是类似的。 AQS又是对synchronized的功能的增强。 比如增加了非阻塞式锁,支持打断式获取锁、支持超时式获取锁。 并且大师们更对AQS实现了公平锁、非公平锁,还有一些更好用的工具类。 ReentrantReadWriteLock也是使用AQS实现的。 它对state的高低16位有更多的功能。 读用的是高16位,写用的是低16位 并且读锁使用的是共享模式, 也就是当写锁释放时, 会将读线程一个一个唤醒。
2022-07-23 - 码小呆 👍(1) 💬(1)
感觉一遍还不够我大脑理解,需要多看看!
2022-06-26 - :) 👍(1) 💬(2)
老师,你好, 看了下源码ReentrantLock 是 implements Lock 而不是 extends ,为什么还用内部类Sync,而不是直接extends AbstractQueuedSynchronizer?
2022-06-25 - 小豹哥 👍(1) 💬(1)
这也太仔细认真了吧,这细节抠的! 给老师点个大大的赞
2022-06-24 - TableBear 👍(0) 💬(2)
老师,请问你的源码是JDK什么版本的?
2022-07-06 - 雨落~紫竹 👍(0) 💬(1)
我又来了
2022-07-02 - smilekaka 👍(0) 💬(0)
太干了,有点噎住....
2023-07-26 - 小麦 👍(0) 💬(0)
“如果节点的状态为 Node.CONDITION,或者 node.prev 为空,表示线程在等待条件被触发。为什么节点的前驱节点不为空就可以认为线程在同步阻塞队列中呢?这是因为进入同步队列时是用 CAS 机制来更新前驱节点的。” AQS 貌似没有通过 CAS 更新前驱节点的操作。 private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; 没看到有前驱节点的偏移量。
2022-08-04 - Zx 👍(0) 💬(1)
有一个小问题请教老师,为什么唤醒节点要从队尾开始找最靠前的非取消节点,而不是从队头开始找呢
2022-06-27