当前位置 博文首页 > JavaEdge全是干货的技术号:AQS-AbstractQueuedSynchronizer源码

    JavaEdge全是干货的技术号:AQS-AbstractQueuedSynchronizer源码

    作者:[db:作者] 时间:2021-07-07 12:42

    6 锁的获取

    获取锁显式的方法就是 Lock.lock () ,最终目的其实是想让线程获得对资源的访问权。而 Lock 又是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或 tryAcquire 方法。

    acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现,acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的 state 状态来决定是否能够获得锁,接下来我们详细看下 acquire 的源码解析。

    acquire 也分两种,一种是独占锁,一种是共享锁

    6.1 acquire 独占锁

    • 独占模式下,尝试获得锁

    在独占模式下获取,忽略中断。 通过至少调用一次 tryAcquire(int) 来实现,并在成功后返回。 否则,将线程排队,并可能反复阻塞和解除阻塞,并调用 tryAcquire(int) 直到成功。 该方法可用于实现方法 Lock.lock()。
    对于 arg 参数,该值会传送给 tryAcquire,但不会被解释,可以实现你喜欢的任何内容。

    • 看一下 tryAcquire 方法
      AQS 对其只是简单的实现,具体获取锁的实现方法还是由各自的公平锁和非公平锁单独实现,实现思路一般都是 CAS 赋值 state 来决定是否能获得锁(阅读后文的 ReentrantLock 核心源码解析即可)。

    执行流程

    1. 尝试执行一次 tryAcquire
    • 成功直接返回
    • 失败走 2
    1. 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾

    2. 接着调用 acquireQueued 方法

    • 阻塞当前节点
    • 节点被唤醒时,使其能够获得锁
    1. 如果 2、3 失败了,中断线程

    6.1.1 addWaiter

    将当前线程放入等待队列

    private Node addWaiter(Node mode) {
    	// 创建一个等待节点代表当前线程
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 添加到等待队列
        enq(node);
        return node;
    }
    

    执行流程

    1. 通过当前的线程和锁模式新建一个节点
    2. pred 指针指向尾节点tail
    3. 将Node 的 prev 指针指向 pred
    4. 通过compareAndSetTail方法,完成尾节点的设置。该方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
    • 如果 pred 指针为 null(说明等待队列中没有元素),或者当前 pred 指针和 tail 指向的位置不同(说明被别的线程已经修改),就需要 enq
    private Node enq(final Node node) {
    	// juc 中看到死循环,肯定有多个分支
        for (;;) {
        	// 初始值为 null
            Node t = tail;
            if (t == null) { // 那就初始化
            	// 由于是多线程操作,为保证只有一个
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	// 将当前线程 node 的 prev 设为t
            	// 注意这里先更新的是 prev 指针
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                	// 有 next延后更新的,所以通过 next 不一定找得到后续结点,所以释放锁时是从 tail 节点开始找 prev 指针
                    t.next = node;
                    return t;
                }
                // 因为prev 指针是 volatile 的,所以这里的 node.prev = t 线程是可见的。所以只要 compareAndSetTail,那么必然其他线程可以通过 c 节点的 prev 指针访问前一个节点且可见。
            }
        }
    }
    

    if 分支

    else 分支

    把新的节点添加到同步队列的队尾。

    如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

    线程获取锁的时候,过程大体如下:

    1. 当没有线程获取到锁时,线程1获取锁成功
    2. 线程2申请锁,但是锁被线程1占有

      如果再有线程要获取锁,依次在队列中往后排队即可。

    在 addWaiter 方法中,并没有进入方法后立马就自旋,而是先尝试一次追加到队尾,如果失败才自旋,因为大部分操作可能一次就会成功,这种思路在自己写自旋的时候可以多多参考哦。

    6.1.2 acquireQueued

    此时线程节点 node已经通过 addwaiter 放入了等待队列,考虑是否让线程去等待。

    阻塞当前线程。

    • 自旋使前驱结点的 waitStatus 变成 signal,然后阻塞自身
    • 获得锁的线程执行完成后,释放锁时,会唤醒阻塞的节点,之后再自旋尝试获得锁
    final boolean acquireQueued(final Node node, int arg) {
    	// 标识是否成功取得资源
        boolean failed = true;
        try {
            // 标识是否在等待过程被中断过
            boolean interrupted = false;
            // 自旋,结果要么获取锁或者中断
            for (;;) {
            	// 获取等待队列中的当前节点的前驱节点
                final Node p = node.predecessor();
                // 代码优化点:若 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(此前的头结点还只是虚节点)
                if (p == head && tryAcquire(arg)) {
                	// 获取锁成功,将头指针移动到当前的 node
                    setHead(node);
                    p.next = null; // 辅助GC
                    failed = false;
                    return interrupted;
                }
                // 获取锁失败了,走到这里
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    看其中的具体方法:

    setHead

    方法的核心:

    shouldParkAfterFailedAcquire

    依据前驱节点的等待状态判断当前线程是否应该被阻塞

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 获取头结点的节点状态
        int ws = pred.waitStatus;
        // 说明头结点处于唤醒状态
        if (ws == Node.SIGNAL)
            /*
             * 该节点已经设置了状态,要求 release 以 signal,以便可以安全park
             */
            return true;
        // 前文说过 waitStatus>0 是取消状态    
        if (ws > 0) {
            /*
             * 跳过已被取消的前驱结点并重试
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus 必须为 0 或 PROPAGATE。 表示我们需要一个 signal,但不要 park。 调用者将需要重试以确保在 park 之前还无法获取。
             */
            // 设置前驱节点等待状态为 SIGNAL 
            // 给头结点放一个信物,告诉此时
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    为避免自旋导致过度消费 CPU 资源,以判断前驱节点的状态来决定是否挂起当前线程

    • 挂起流程图

    如下处理 prev 指针的代码。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 prev 指针较安全。

    parkAndCheckInterrupt

    • 将当前线程挂起,阻塞调用栈并返回当前线程的中断状态
    private final boolean parkAndCheckInterrupt() {
    	// 进入休息区 unpark就是从休息区唤醒
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    • 一图小结该方法流程

      从上图可以看出,跳出当前循环的条件是当“前驱节点是头结点,且当前线程获取锁成功”。

    6.1.3 cancelAcquire

    shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?又是在什么时间释放节点通知到被挂起的线程呢?

        private void cancelAcquire(Node node) {
            // 如果节点不存在,无视该方法
            if (node == null)
                return;
    		// 设置该节点不关联任何线程,即虚节点
            node.thread = null;
    
            // 跳过被取消的前驱结点们
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
    
            // predNext 是要取消拼接的明显节点。如果没有,以下情况 CAS 将失败,在这种情况下,我们输掉了和另一个cancel或signal的竞争,因此无需采取进一步措施。
            // 通过前驱节点,跳过取消状态的node
            Node predNext = pred.next;
    
            // 这里可以使用无条件写代替CAS,把当前node的状态设置为CANCELLED
            // 在这个原子步骤之后,其他节点可以跳过我们。
            // 在此之前,我们不受其他线程的干扰。
            node.waitStatus = Node.CANCELLED;
    
            // 如果是 tail 节点, 移除自身
            // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
      // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
    
                node.next = node; // 辅助 GC
            }
        }
    

    当前的流程:

    • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED
    • 根据当前节点的位置,考虑以下三种情况:
      (1) 当前节点是尾节点。
      (2) 当前节点是Head的后继节点。
      (3) 当前节点不是Head的后继节点,也不是尾节点。

    根据(2),来分析每一种情况的流程。

    • 当前节点是尾节点
    • 当前节点是Head的后继节点
    • 当前节点不是Head的后继节点,也不是尾节点

      通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?

    执行 cancelAcquire 时,当前节点的前驱节点可能已经出队(已经执行过try代码块中的shouldParkAfterFailedAcquire),如果此时修改 prev 指针,有可能会导致 prev 指向另一个已经出队的 Node,因此这块变化 prev 指针不安全。

    6.2 tryAcquireNanos

    尝试以独占模式获取,如果中断将中止,如果超过给定超时将直接失败。首先检查中断状态,然后至少调用一次#tryAcquire,成功后返回。否则,线程将排队,可能会反复地阻塞和取消阻塞,调用#tryAcquire,直到成功或线程中断或超时结束。此方法可用于实现方法 Lock#tryLock(long, TimeUnit)。

    尝试性的获取锁, 获取锁不成功, 直接加入到同步队列,加入操作即在doAcquireNanos

    doAcquireNanos

    以独占限时模式获取。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 截止时间    
        final long deadline = System.nanoTime() + nanosTimeout;
        // 将当前的线程封装成 Node 加入到同步对列里面
        final Node node = addWaiter(Node.EXCLUSIVE)