AQS

AbstractQueuedSynchronizer,简称AQS(同步器)

  • 是用来构建锁或者其他同步组件的基础框架
  • 使用一个int成员变量表示同步状态(state)
  • 通过内置的FIFO队列来完成资源获取线程的排队工作
  • 子类通过继承并实现它的抽象方法来管理同步器

队列同步器的接口

同步器的设计基于模板方法模式

  • getState() 获取当前同步状态
  • setState() 设置当前同步状态
  • compareAndSetState() 使用CAS设置当前状态,该方法能够保证状态设置的原子性

以上三种方法用来访问和修改同步状态

同步器可重写方法

  • tryAcquire 独占式获取同步状态
  • tryRelease 独占式释放同步状态
  • tryAcquireShared 共享式获取同步状态
  • tryReleaseShared 共享式释放同步状态
  • isHeldExclusively 当前同步器是否在独占模式下被线程占用

队列同步器实现分析

同步队列

同步器依赖内部的同步队列(FIFO)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会把当前线程以及等待状态等信息构造成一个Node节点,并将其加入到同步队列,同时阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态

同步队列中的Node用来保存同步状态失败的线程引用,等待状态以及前驱后继节点

同步器及同步队列结构如图所示

image-20210115172135618

  • 同步器提供一个基于CAS的设置尾节点方法:compareAndSetTail(Node expect,Node update),可以保证加入队列过程的线程安全
  • 同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点

独占式同步状态获取与释放

通过调用同步器的acquire方法可以获取同步状态

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码作用:同步状态获取,节点构造,加入同步队列,在同步队列中自旋

主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取到同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued方法,使得该节点以死循环的方式获取同步状态,如果获取不到则阻塞该节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断实现

确保节点可以被线程安全地,正确地添加

addWaiter使用CAS确保节点能够被线程安全的添加

enq方法中,同步器通过死循环的方式保证节点正确添加,在死循环中通过CAS将节点设置为尾节点后,线程才能从方法中返回

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

线程在死循环中尝试获取同步节点

只有前驱节点是头结点才可以尝试获取同步状态

  • 头结点是成功获取到同步状态的节点,而头节点的线程释放了同步线程后,将会唤醒后继节点,后继节点的线程被唤醒后也需要检查自己的前驱节点是否是头结点
  • 维护同步队列的FIFO原则,节点自旋获取同步状态
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

独占式同步状态获取流程

image-20210115174609200

前驱节点为头结点并且能够成功获取同步状态后,当前线程从acquire(int arg)返回,代表当前线程获取了锁

当前线程获取同步状态并执行了相应逻辑后,就需要释放同步状态,使得后续节点能够继续获取同步状态,通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态后,会唤醒其后继节点(使得后继节点重新尝试获取同步状态)

总结:

​ 获取同步状态时,同步器维护一个同步队列,获取同步状态失败的线程都会封装成节点加入到队列中并在队列中自旋的获取同步状态,移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头结点的后继节点

Condition

condition是同步器AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为AQS的内部类也较为合理

condition接口

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁,Condition对象是由Lock对象创建出来的(Lock.newCondition())

调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才会从await()返回,并且在返回前已经获取了锁。

condition实现分析

等待队列

同AQS,condition的等待队列也是一个FIFO的队列,每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了await方法,那么该线程将会释放锁,构造成节点加入等待队列(从尾部加入等待队列),进入等待状态,Conditon也有首尾节点的引用,新增节点只需要将原有的尾节点指向他,然后更新尾节点即可(节点引用更新不需要用CAS保证,因为调用await方法的线程必定是获取了锁的线程)

等待

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,当从await返回时,线程一定获取了Condition相关联的锁,从队列的角度看,当调用await()时,相当于同步队列的首节点移到了等待队列的尾节点

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程加入等待队列
    Node node = addConditionWaiter();
    //释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

通知

通过调用Condition中的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,将节点移到同步队列中

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

通过调用同步器的enq方法,等待队列的头节点线程安全地移动到同步队列,当节点移动到同步队列后,当前线程使用LockSupport唤醒该节点的线程

被唤醒的线程,从await方法的while循环中退出,进而调用同步器的acquireQueued方法加入到获取同步状态的竞争中,成功获取到同步状态后,被唤醒的线程从之前调用的await方法中返回,此时该线程已经成功获取了锁

SignalAll()方法,相当于对等待队列中的每一个线程均执行一次signal方法,效果就是将等待队列中的所有节点全部移动到同步队列中,并唤醒每个节点的线程


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

ConcurrentHashMap Previous
Top K Next