AQS源码解析

AQS简介

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。

AQS就是AbstractQueuedSynchronizer,它是为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)提供的一个框架。

源码分析

从ReentrantLock进入

可以看到ReentrantLock默认构建了NonfairSync,即非公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

而Sync继承自AbstractQueuedSynchronizer

abstract static class Sync extends AbstractQueuedSynchronizer {}

其实现有NonfairSync和FairSync两种,正好对应ReentrantLock的构造方法

Lock加锁方法

public void lock() {
    sync.lock();
}

首先看到其使用了sync.lock(),进入发现根据NonfairSync和FairSync具体有不同实现

// 非公平
final void lock() {
    if (compareAndSetState(0, 1))
        //如果获取到了,就记录一下当前的线程,以便后面的重入
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// 公平
final void lock() {
    acquire(1);
}

这两者区别在于NonfairSync多了一个条件判断compareAndSetState(0, 1),这个方法是判断是否可以直接获取锁(即第一个进入的线程能直接获取锁,也就是非公平的概念),

// 利用cas操作
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 把当前线程设置成独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

acquire竞争锁

当非公平锁无法独占时,需要和公平锁一样通过调用acquire方法来竞争锁

看位于AQS里面的acquire方法:

  • 如果获取到了,返回true,就会退出if语句
  • 如果没有获取到,那么则将当前线程添加到队列中,并循环获取锁,直到获取到为止。
public final void acquire(int arg) {
    //tryAcquire方法仍然尝试获取锁
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire在AQS中默认是抛出异常,其是需要其子类实现,

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

非公平锁实现:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 获取到锁的标志状态
    int c = getState();
    // 0代表没有被获取到,则尝试获取
    if (c == 0) {
        //这里和上面一样尝试获取锁成功,直接返回true
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 这里判断当前线程是否就是已经获取锁的线程,即可重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 都不满足返回false
    return false;
}

addWaiter添加到队列

当acquire获取锁失败,这利用addWaiter方法把当前线程添加到一个队列中:

/**
  * Creates and enqueues node for current thread and given mode.
  *
  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  * @return the new node
 */
private Node addWaiter(Node mode) {
    // 使用当前线程创建一个Node节点,mode分为共享和排他
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 如果队列中已经存在节点了,那么直接将该节点添加到后面
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果是第一次构建队列,则调用该方法
    enq(node);
    return node;
}


/**
  * Inserts node into queue, initializing if necessary. See picture above.
  * @param node the node to insert
  * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果尾节点还是空的,那么构建一个空节点做为头节点
	    	// 然后在下一次循环的时候进入到else
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和上面一样,将当前线程构建的节点添加到队列的尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued争夺锁

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这个代码里执行完addWaiter创建队列后,利用acquireQueued争夺锁

/**
  * Acquires in exclusive uninterruptible mode for thread already in
  * queue. Used by condition wait methods as well as acquire.
  *
  * @param node the node
  * @param arg the acquire argument
  * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    // 标识获取锁的过程中是否出现了异常
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 无限循环
        for (;;) {
            // 获取node节点的前一个节点
            final Node p = node.predecessor();
            // 如果当前线程为第一个节点(即前一个节点是head节点),则尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断节点是否需要park,如果需要,则进入park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 如果出现异常,取消获取
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        // 如果前节点是SIGNAL状体,则代表需要park
        return true;
    if (ws > 0) {
        // 如果waitStatus的值大于0,代表已取消,需要将无效的节点删除
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 否则将前节点设置为SIGNAL状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

Node的waitStatus的几种状态的含义(即AQS内部类Node的几个属性):

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
  * waitStatus value to indicate the next acquireShared should
  * unconditionally propagate
  */
static final int PROPAGATE = -3;
  • CANCELLED

    值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。

  • SIGNAL

    值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。

  • CONDITION

    值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE

    值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。

    0状态:值为0,代表初始化状态。

private final boolean parkAndCheckInterrupt() {
	//通过LockSupport类来park该线程
    LockSupport.park(this);
    //将park的线程唤醒可能是调用unpark方法,也可能是被打断了
    return Thread.interrupted();
}

到这里,非公平锁的整个获取流程就结束了。

ReentrantLock获取非公平锁流程

图片地址

ReentrantLock获取非公平锁流程

unlock释放锁

同样,锁的释放同样由AQS实现

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 当前线程非拥有者
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        // 把锁拥有者设置为null
        setExclusiveOwnerThread(null);
    }
    // 更新线程状态
    setState(c);
    return free;
}

未完待续。。