Java并发之AQS源码分析

栏目: 编程工具 · 发布时间: 6年前

内容简介:AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时运用了 CLH 同步队列作同步器,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。说是框架,其实就是一个普通的类,它是 Doug Lea 大神的杰作,下面让我们一起学习 Doug Lea 大神是如何用一个普通类就玩转 C

AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时运用了 CLH 同步队列作同步器,这也是 ReentrantLock、CountDownLatch 等同步 工具 实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。

说是框架,其实就是一个普通的类,它是 Doug Lea 大神的杰作,下面让我们一起学习 Doug Lea 大神是如何用一个普通类就玩转 CAS 解决并发安全问题。

AQS 简介

之前我有介绍过 CAS 原理,AQS 就是建立在它的基础之上,增加了大量的实现细节,例如获取同步状态、FIFO同步队列,独占式锁和共享式锁的获取和释放等等,这些都是 AQS 类提供出来的一些方法。

现在我们来直接定位到类 java.util.concurrent.locks.AbstractQueuedSynchronizer ,看到类里面有很多注释,我主要将 AQS 类的几个重要字段与方法列出来:

public abstract class AbstractQueuedSynchronizer
  extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // ...

 }
  1. head 字段为等待队列的头节点;
  2. tail 字段为等待队列的尾节点;
  3. state 字段为同步状态,其中 state > 0 为有锁状态,每次加锁就在原有state基础上加1,即代表当前持有锁的线程加了 state 次锁,反之解锁时每次减一,当 statte = 0 为无锁状态;
  4. 通过 compareAndSetState 方法操作 CAS 更改 state 状态,保证 state 的原子性。

有没有发现,这几个字段都用 volatile 关键字进行修饰,以确保多线程间保证字段的可见性。

同步队列

在分析锁的源码之前,我们先来看看 FIFO 队列的结构:

Java并发之AQS源码分析

用大神的注释来形象地描述一下队列的模型:

/**
  * <pre>
  *      +------+  prev +-----+       +-----+
  * head |      | <---- |     | <---- |     |  tail
  *      +------+       +-----+       +-----+
  * </pre>
  */

这是一个普通双向链表的节点结构,多了 thread 字段用于存储当前线程对象,同时每个节点都有一个 waitStatus 等待状态,一共有四种状态:

  1. CANCELLED(1):取消状态,如果当前线程的前置节点状态为 CANCELLED,则表明前置节点已经等待超时或者已经被中断了,这时需要将其从等待队列中删除。
  2. SIGNAL(-1):等待触发状态,如果当前线程的前置节点状态为 SIGNAL,则表明当前线程需要阻塞。
  3. CONDITION(-2):等待条件状态,表示当前节点在等待condition,即在condition队列中。
  4. PROPAGATE(-3):状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用。

以上解释说明参考官方文档。

可以这么理解:head 节点可以表示成当前持有锁的线程的节点,其余线程竞争锁失败后,会加入到队尾,tail始终指向队列的最后一个节点。

AQS 提供独占锁和共享锁两种模式,独占锁指的是如果有线程获取到锁,那么其它线程只能是获取锁失败,然后进入等待队列中等待被唤醒,而共享锁当有一个线程获取到锁之后,那么它就会依次唤醒等待队列中可以跟它共享的节点,当然这些节点也是共享锁类型。

独占式锁

获取锁

获取独占锁方法:

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

解读:

tryAcquire(arg)
addWaiter(Node.EXCLUSIVE)

基于上面源码的步骤分析后,我们继续往下看源码具体实现:

private Node addWaiter(Node mode) {
  // 创建一个基于当前线程的节点,该节点是 Node.EXCLUSIVE 独占式类型
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  // 这里先判断队尾是否为空,如果不为空则直接将节点加入队尾
  if (pred != null) {
    node.prev = pred;
    // 采取 CAS 操作,将当前节点设置为队尾节点,由于采用了 CAS 原子操作,无论并发怎么修改,都有且只有一条线程可以修改成功,其余都将执行后面的enq方法
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

简单来说 addWaiter(Node mode) 方法做了以下事情:

  1. 创建基于当前线程的独占式类型的节点;
  2. 将节点加入队尾。

我们继续看 enq(Node node) 方法:

private Node enq(final Node node) {
  // 自旋操作
  for (;;) {
    Node t = tail;
    // 如果队尾节点为空,那么进行CAS操作初始化队列
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      // 这一步也是采取CAS操作,将当前节点加入队尾,如果失败的话,自旋继续修改直到成功为止
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

从源码可看出, enq(final Node node) 方法主要做了以下事情:

  1. 采用自旋机制,这是 aqs 里面很重要的一个机制;
  2. 如果队尾节点为空,则初始化队列,将当前节点设置为头节点;
  3. 如果队尾节点不为空,则继续采取CAS操作,将当前节点加入队尾,不成功则继续自旋,知道成功为止;

对比了上面两段代码,不难看出,首先是判断队尾是否为空,先进行一次CAS入队操作,如果失败则进入enq(final Node node)方法执行完整的入队操作。

经过上面CAS不断尝试,这时当前节点已经成功加入到队尾了,接下来就到了挂起当前线程的操作了,我们继续往下撕扯源码:

在分析 acquireQueued(final Node node, int arg) 方法之前,我们需要明确一点,我们上面也说过,head 节点代表当前持有锁的线程,那么如果当前节点的 pred 节点是 head 节点,很可能此时 head 节点已经释放锁了,所以此时需要再次尝试获取锁。

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    // 线程中断标记字段
    boolean interrupted = false;
    for (;;) {
      // 获取当前节点的 pred 节点
      final Node p = node.predecessor();
      // 如果 pred 节点为 head 节点,那么再次尝试获取锁
      if (p == head && tryAcquire(arg)) {
        // 获取锁之后,那么当前节点也就成为了 head 节点
        setHead(node);
        p.next = null; // help GC
        failed = false;
        // 不需要挂起,返回 false
        return interrupted;
      }
      // 获取锁失败,则进入挂起逻辑
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

这一步 acquireQueued(final Node node, int arg) 方法主要做了以下事情:

  1. 判断当前节点的 pred 节点是否为 head 节点,如果是,则尝试获取锁;
  2. 获取锁失败后,进入挂起逻辑。

接下来继续看挂起逻辑源码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    // 如果 pred 节点为 SIGNAL 状态,返回true,说明当前节点需要挂起
    return true;
  // 如果ws > 0,说明节点状态为CANCELLED,需要从队列中删除
  if (ws > 0) {
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // 如果是其它状态,则操作CAS统一改成SIGNAL状态
    // 由于这里waitStatus的值只能是0或者PROPAGATE,所以我们将节点设置为SIGNAL,从新循环一次判断
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

这一步 shouldParkAfterFailedAcquire(Node pred, Node node) 方法主要做了以下事情:

  1. 判断 pred 节点状态,如果为 SIGNAL 状态,则直接返回 true 执行挂起;
  2. 删除状态为 CANCELLED 的节点。

通俗来说就是:根据 pred 节点状态来判断当前节点是否可以挂起,如果该方法返回 false,那么挂起条件还没准备好,就会重新进入 acquireQueued(final Node node, int arg) 的自旋体,重新进行判断。如果返回 true,那就说明当前线程可以进行挂起操作了,那么就会继续执行挂起,所以我们继续往下看源码:

private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}

这步骤就是执行中断最核心的操作了,这里调用了 LockSupport.park(this) 方法,查看官方解析该类的作用是:

LockSupport是用来创建锁和其他同步类的基本 线程阻塞 原语。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞,LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。

(下面的会抽时间补上去)

释放锁

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
private void unparkSuccessor(Node node) {

  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);


  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);
}

共享式锁


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Text Processing in Python

Text Processing in Python

David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99

Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试