AQS及其应用ReentrantLock源码分析

AQS原理

概述

全称是 AbstractQueuedSynchronizer(抽象同步队列),是阻塞式锁和相关的同步器工具的框架。

特点:
+ 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
+ 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
+ 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

获取锁:

// 如果获取锁失败
if (!tryAcquire(arg)) {
    // 入队, 可以选择阻塞当前线程 park unpark
}

释放锁

// 如果释放锁成功
if (tryRelease(arg)) {
 // 让阻塞线程恢复运行
}

实现不可重入锁

自定义同步器

同步器实现AQS

final class MySync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int acquires) {
        if (acquires == 1){
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int acquires) {
        if(acquires == 1) {
            if(getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        return false;
    }

    protected Condition newCondition() {
        return new ConditionObject();
    }

    @Override //是否持有独占锁
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
}

自定义锁

class MyLock implements Lock {
    static MySync sync = new MySync();

    @Override
    // 尝试,不成功,进入等待队列
    public void lock() {
        sync.acquire(1);
    }

    @Override
    // 尝试,不成功,进入等待队列,可打断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    // 尝试一次,不成功返回,不进入队列
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    // 尝试,不成功,进入等待队列,有时限
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    // 释放锁
    public void unlock() {
        sync.release(1);
    }

    @Override
    // 生成条件变量
    public Condition newCondition() {
        return sync.newCondition();
    }
}

心得

起源

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。

目标

AQS 要实现的功能目标
+ 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁tryAcquire
+ 获取锁超时机制
+ 通过打断取消机制
+ 独占机制及共享机制
+ 条件不满足时的等待机制

设计

AQS 的基本思想其实很简单
获取锁的逻辑

while(state 状态不允许获取) {
    if(队列中还没有此线程) {
        入队并阻塞
    }
}
当前线程出队

释放锁的逻辑

if(state 状态允许了) {
    恢复阻塞的线程(s)
}

要点
+ 原子维护 state 状态
+ 阻塞及恢复线程
+ 维护队列

state设计

  • state 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

阻塞恢复设计

  • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume ,那么 suspend 将感知不到
  • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,先 unpark 再 park 也没问题
  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程还可以通过 interrupt 打断

队列设计

  • 使用了 FIFO 先入先出队列,并不支持优先级队列
  • 设计时借鉴了 CLH 队列,它是一种单向无锁队列

Node的waitStatus状态:

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。

  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

  • 0:新结点入队时的默认状态。

AQS 在一些方面改进了 CLH

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 队列中还没有元素 tail 为 null
        if (t == null) {
            // 将 head 从 null -> dummy
            if (compareAndSetHead(new Node()))
            tail = head;
        } else {
            // 将 node 的 prev 设置为原来的 tail
            node.prev = t;
            // 将 tail 从原来的 tail 设置为 node
            if (compareAndSetTail(t, node)) {
                // 原来 tail 的 next 设置为 node
                t.next = node;
                return t;
            }
        }
    }
}

ReentrantLock原理

AQS及其应用ReentrantLock源码分析

ReentrantLock使用了继承自AQS的内部类Sync,有两个实现:NonefairSync(非公平锁)和FairSync(公平锁)

由于默认构造非公平锁,这里来看非公平锁的原理:

加锁

final void lock() {
    //没有竞争,独占状态
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);//出现竞争,下一步
}
public final void acquire(int arg) {
    // 先再次尝试能不能获得锁(下一步),否则进入addWaitor(Node.EXCLUSIVE),跳过下一步
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //如果阻塞过程中被打断,不响应打断,在获得锁之后才能再响应打断。
        selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //直接设置独占
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //可以锁重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //不能获得锁,下一步
    return false;
}
//addWaiter将结点附加到链表尾部
//mode代表一个nextWaiter,在条件等待的时候使用
//Node.EXCLUSIVE代表独占模式。
//添加到链表末尾后调用acquireQueued,下一步
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //如果node是第二个节点再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //否则尝试阻塞,下一步
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//检查是否可以park(前一个节点的waitStatus应该标记成-1)
//可以的话调用parkAndCheckInterrupt,下一步
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

使用LockSupport.park阻塞

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

解锁

public final boolean release(int arg) {
    //调用tryRelease释放锁,因为此时是独占式的,很容易就能释放锁。
    //如果重入过了需要把计数减为0了才算释放成功。
    if (tryRelease(arg)) {
        Node h = head;
        //唤醒等待队列里的下一个线程,下一步
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//唤醒后继结点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    //如果下一个节点被cancel了从后往前找到一个正常的节点替换它
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //unpark唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

可打断原理

核心方法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            //如果等待过程中被打断parkAndCheckInterrupt返回false,抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

公平锁原理

// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
        if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

ConditionObject里面有一个firstWaiter、lastWaiter,然而它是一个单向队列。

await过程

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将线程添加到等待队列中,下一步
    Node node = addConditionWaiter();
    // 释放占有的锁,跳过下一步
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    //是否在阻塞队列中(即还没有获得到锁)
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 处理被中断的情况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //连接到等待队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        //重复调用tryRelease直到锁全部被释放,然后唤醒后继结点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

signal过程

public final void signal() {
    //判断当前线程是不是锁的持有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //发出信号,下一步
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } //转移节点到阻塞队列
    while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    //设置回状态0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //插入阻塞队列末尾
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/aqs%e5%8f%8a%e5%85%b6%e5%ba%94%e7%94%a8reentrantlock%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/

发表评论

电子邮件地址不会被公开。