深度解析 Java 8:JDK1.8 AbstractQueuedSynchronizer 的实现分析(上)

阅读数:27008 2014 年 7 月 31 日 00:38

前言

Java 中的 FutureTask 作为可异步执行任务并可获取执行结果而被大家所熟知。通常可以使用 future.get() 来获取线程的执行结果,在线程执行结束之前,get 方法会一直阻塞状态,直到 call() 返回,其优点是使用线程异步执行任务的情况下还可以获取到线程的执行结果,但是 FutureTask 的以上功能却是依靠通过一个叫 AbstractQueuedSynchronizer 的类来实现,至少在 JDK 1.5、JDK1.6 版本是这样的(从 1.7 开始 FutureTask 已经被其作者 Doug Lea 修改为不再依赖 AbstractQueuedSynchronizer 实现了,这是 JDK1.7 的变化之一)。但是 AbstractQueuedSynchronizer 在 JDK1.8 中还有如下图所示的众多子类:

这些 JDK 中的工具类或多或少都被大家用过不止一次,比如 ReentrantLock,我们知道 ReentrantLock 的功能是实现代码段的并发访问控制,也就是通常意义上所说的锁,在没有看到 AbstractQueuedSynchronizer 前,可能会以为它的实现是通过类似于 synchronized,通过对对象加锁来实现的。但事实上它仅仅是一个工具类!没有使用更“高级”的机器指令,不是关键字,也不依靠 JDK 编译时的特殊处理,仅仅作为一个普普通通的类就完成了代码块的并发访问控制,这就更让人疑问它怎么实现的代码块的并发访问控制的了。那就让我们一起来仔细看下 Doug Lea 怎么去实现的这个锁。为了方便,本文中使用 AQS 代替 AbstractQueuedSynchronizer。

细说 AQS

在深入分析 AQS 之前,我想先从 AQS 的功能上说明下 AQS,站在使用者的角度,AQS 的功能可以分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,即便是它最有名的子类 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的,为什么这么做,后面我们再分析,到目前为止,我们只需要明白 AQS 在功能上有独占控制和共享控制两种功能即可。

独占锁

在真正对解读 AQS 之前,我想先从使用了它独占控制功能的子类 ReentrantLock 说起,分析 ReentrantLock 的同时看一看 AQS 的实现,再推理出 AQS 独特的设计思路和实现方式。最后,再看其共享控制功能的实现。

对于 ReentrantLock,使用过的同学应该都知道,通常是这么用它的:

reentrantLock.lock()
        //do something
        reentrantLock.unlock()

ReentrantLock 会保证 do something 在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的 lock 方法会返回。其余线程会被挂起,直到获取锁。从这里可以看出,其实 ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock 使用的就是 AQS 的独占 API 实现的。

那现在我们就从 ReentrantLock 的实现开始一起看看重入锁是怎么实现的。

首先看 lock 方法:

如 FutureTask(JDK1.6)一样,ReentrantLock 内部有代理类完成具体操作,ReentrantLock 只是封装了统一的一套 API 而已。值得注意的是,使用过 ReentrantLock 的同学应该知道,ReentrantLock 又分为公平锁和非公平锁,所以,ReentrantLock 内部只有两个 sync 的实现:

公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。

非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关,类似于堵车时,加塞的那些 XXXX。

到这里,通过 ReentrantLock 的功能和锁的所谓排不排队的方式,我们是否可以这么猜测 ReentrantLock 或者 AQS 的实现(现在不清楚谁去实现这些功能):有那么一个被 volatile 修饰的标志位叫做 key,用来表示有没有线程拿走了锁,或者说,锁还存不存在,还需要一个线程安全的队列,维护一堆被挂起的线程,以至于当锁被归还时,能通知到这些被挂起的线程,可以来竞争获取锁了。

至于公平锁和非公平锁,唯一的区别是在获取锁的时候是直接去获取锁,还是进入队列排队的问题了。为了验证我们的猜想,我们继续看一下 ReentrantLock 中公平锁的实现:

调用到了 AQS 的 acquire 方法:

从方法名字上看语义是,尝试获取锁,获取不到则创建一个 waiter(当前线程)后放到队列中,这和我们猜测的好像很类似。 [G1]

先看下 tryAcquire 方法:

留空了,Doug Lea 是想留给子类去实现(既然要给子类实现,应该用抽象方法,但是 Doug Lea 没有这么做,原因是 AQS 有两种功能,面向两种使用场景,需要给子类定义的方法都是抽象方法了,会导致子类无论如何都需要实现另外一种场景的抽象方法,显然,这对子类来说是不友好的。)

看下 FairSync 的 tryAcquire 方法:

getState 方法是 AQS 的方法,因为在 AQS 里面有个叫 statede 的标志位 :

事实上,这个 state 就是前面我们猜想的那个“key”!

回到 tryAcquire 方法:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();// 获取当前线程 
            int c = getState();  // 获取父类 AQS 中的标志位 
            if (c == 0) {
                if (!hasQueuedPredecessors() && 
                    // 如果队列中没有其他线程  说明没有线程正在占有锁!
                    compareAndSetState(0, acquires)) { 
                    // 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来的,从上面的图中可以知道,这个值是写死的 1
                    setExclusiveOwnerThread(current);
                    // 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
             // 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重入锁,
             // 是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。
                int nextc = c + acquires;// 如果是的,累加在 state 字段上就可以了。
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

到此,如果如果获取锁,tryAcquire 返回 true,反之,返回 false,回到 AQS 的 acquire 方法。

如果没有获取到锁,按照我们的描述,应该将当前线程放到队列中去,只不过,在放之前,需要做些包装。

先看 addWaiter 方法:

用当前线程去构造一个 Node 对象,mode 是一个表示 Node 类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。

这里 lock 调用的是 AQS 独占的 API,当然,可以写死是独占状态的节点。

创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。

将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。

在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。

而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):

黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail 后面)后面。这个从 enq 方法可以看出来,上文中有提到 enq 方法为将节点插入队列的方法:

再回来看看

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
             // 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
                    setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && 
                // 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
                    parkAndCheckInterrupt()) 
               // 如果需要,借助 JUC 包下的 LockSopport 类的静态方法 Park 挂起当前线程。知道被唤醒。
                    interrupted = true;
            }
        } finally {
            if (failed) // 如果有异常 
                cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
        }
    }

这块代码有几点需要说明:

1. Node 节点中,除了存储当前线程,节点类型,队列中前后元素的变量,还有一个叫 waitStatus 的变量,改变量用于描述节点的状态,为什么需要这个状态呢?

原因是:AQS 的队列中,在有并发时,肯定会存取一定数量的节点,每个节点 [G4] 代表了一个线程的状态,有的线程可能“等不及”获取锁了,需要放弃竞争,退出队列,有的线程在等待一些条件满足,满足后才恢复执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:

分别表示:

  1. 节点取消
  2. 节点等待触发
  3. 节点等待条件
  4. 节点状态需要向后传播。

只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。

2.  对线程的挂起及唤醒操作是通过使用 UNSAFE 类调用 JNI 方法实现的。当然,还提供了挂起指定时间后唤醒的 API,在后面我们会讲到。

到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到 AQS 队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略。

看完了获取锁,在看看释放锁,具体看代码之前,我们可以先继续猜下,释放操作需要做哪些事情:

  1. 因为获取锁的线程的节点,此时在 AQS 的头节点位置,所以,可能需要将头节点移除。
  2. 而应该是直接释放锁,然后找到 AQS 的头节点,通知它可以来竞争锁了。

是不是这样呢? 我们继续来看下,同样我们用 ReentrantLock 的 FairSync 来说明:

unlock 方法调用了 AQS 的 release 方法,同样传入了参数 1,和获取锁的相应对应,获取一个锁,标示为 +1,释放一个锁,标志位 -1。

同样,release 为空方法,子类自己实现逻辑:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases; 
            if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {// 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

释放锁,成功后,找到 AQS 的头节点,并唤醒它即可:

值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。

到此,ReentrantLock 的 lock 和 unlock 方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync 没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:

非公平锁的 lock 方法的处理方式是: 在 lock 的时候先直接 cas 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

而对于公平锁:则是老老实实的开始就走 AQS 的流程排队获取锁。如果前面有人调用过其 lock 方法,则排在队列中前面,也就更有机会更早的获取锁,从而达到“公平”的目的。

总结

这篇文章,我们从 ReentrantLock 出发,完整的分析了 AQS 独占功能的 API 及内部实现,总的来说,思路其实并不复杂,还是使用的标志位 + 队列的方式,记录获取锁、竞争锁、释放锁等一系列锁的状态,或许用更准确一点的描述的话,应该是使用的标志位 + 队列的方式,记录锁、竞争、释放等一系列独占的状态,因为站在 AQS 的层面 state 可以表示锁,也可以表示其他状态,它并不关心它的子类把它变成一个什么工具类,而只是提供了一套维护一个独占状态。甚至,最准确的是 AQS 只是维护了一个状态,因为,别忘了,它还有一套共享状态的 API,所以,AQS 只是维护一个状态,一个控制各个线程何时可以访问的状态,它只对状态负责,而这个状态表示什么含义,由子类自己去定义。


感谢郭蕾对本文的审校。

给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

收藏

评论

微博

用户头像
发表评论

注册/登录 InfoQ 发表评论

最新评论

用户头像
Geek_52cb0b 2019 年 09 月 07 日 12:12 0 回复
见过分析最清晰的 感谢分享
没有更多了