Java并发基础:了解无锁CAS就从源码分析

栏目: Java · 发布时间: 6年前

什么是 CAS

CAS的全称为Compare And Swap,直译就是比较交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在intel的CPU中,使用的是 cmpxchg 指令,就是说CAS是靠硬件实现的,从而在硬件层面提升效率。

CSA 原理

利用CPU的CAS指令,同时借助JNI来完成 Java 的非阻塞算法,其它原子操作都是利用类似的特性完成的。 在 java.util.concurrent 下面的源码中, Atomic , ReentrantLock 都使用了Unsafe类中的方法来保证并发的安全性。

CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁,JDK中大量使用了CAS来更新数据而防止加锁来保持原子更新。

CAS 操作包含三个操作数 :内存偏移量位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。

源码分析

下面来看一下 java.util.concurrent.atomic.AtomicInteger.javagetAndIncrement()getAndDecrement() 是如何利用CAS实现原子性操作的。

AtomicInteger 源码解析

// 使用 unsafe 类的原子操作方式
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        //计算变量 value 在类对象中的偏移量
        valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

valueOffset 字段表示 "value" 内存位置,在 compareAndSwap 方法 ,第二个参数会用到.

关于偏移量

Java并发基础:了解无锁CAS就从源码分析

Unsafe 调用C 语言可以通过偏移量对变量进行操作

//volatile变量value
private volatile int value;

 /**
 * 创建具有给定初始值的新 AtomicInteger
 *
 * @param initialValue 初始值
 */
public AtomicInteger(int initialValue) {
    value = initialValue;
}

//返回当前的值
public final int get() {
    return value;
}
//原子更新为新值并返回旧值
public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//最终会设置成新值
public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
}
//如果输入的值等于预期值,则以原子方式更新为新值
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//方法相当于原子性的 ++i
public final int getAndIncrement() {
    //三个参数,1、当前的实例 2、value实例变量的偏移量 3、递增的值。
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
//方法相当于原子性的 --i
public final int getAndDecrement() {
    //三个参数,1、当前的实例 2、value实例变量的偏移量 3、递减的值。
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

实现逻辑封装在 Unsafe 中 getAndAddInt 方法,继续往下看, Unsafe 源码解析

Unsafe 源码解析

在JDK8中追踪可见 sun.misc.Unsafe 这个类是无法看见源码的,打开 openjdk8 源码看

文件: openjdk-8-src-b132-03_mar_2014.zip

目录: openjdk\jdk\src\share\classes\sun\misc\Unsafe.java

通常我们最好也不要使用 Unsafe 类,除非有明确的目的,并且也要对它有深入的了解才行。要想使用 Unsafe 类需要用一些比较 tricky 的办法。Unsafe类使用了单例模式,需要通过一个静态方法 getUnsafe() 来获取。但Unsafe类做了限制,如果是普通的调用的话,它会抛出一个 SecurityException 异常;只有由主类加载器加载的类才能调用这个方法。

下面是 sun.misc.Unsafe.java 类源码

//获取Unsafe实例静态方法
@CallerSensitive
public static Unsafe getUnsafe() {
    Class<?> caller = Reflection.getCallerClass();
    if (!VM.isSystemDomainLoader(caller.getClassLoader()))
        throw new SecurityException("Unsafe");
    return theUnsafe;
}

网上也有一些办法来用主类加载器加载用户代码,最简单方法是利用Java反射,方法如下:

private static Unsafe unsafe;

static {
    try {
        //通过反射获取rt.jar下的Unsafe类
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        unsafe = (Unsafe) field.get(null);
    } catch (Exception e) {
        System.out.println("Get Unsafe instance occur error" + e);
    }
}

获取到Unsafe实例之后,我们就可以为所欲为了。Unsafe类提供了以下这些功能:

https://www.cnblogs.com/pkufork/p/java_unsafe.html

//native硬件级别的原子操作
    //类似的有compareAndSwapInt,compareAndSwapLong,compareAndSwapBoolean,compareAndSwapChar等等。
    public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

    //内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            //获取对象内存地址偏移量上的数值v
            v = getIntVolatile(o, offset);
            //如果现在还是v,设置为 v + delta,否则返回false,继续循环再次重试.
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }

利用 Unsafe 类的 JNI compareAndSwapInt 方法实现,使用CAS实现一个原子操作更新,

compareAndSwapInt 四个参数:

1、当前的实例 2、实例变量的内存地址偏移量 3、预期的旧值 4、要更新的值

unsafe.cpp 深层次解析

// unsafe.cpp
/*
 * 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY 和 UNSAFE_END 都是宏,
 * 在预编译期间会被替换成真正的代码。下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef):
 *
 * 省略部分内容
 */
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffset
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

// atomic.cpp
unsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
  assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
  /*
   * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载
   * 函数。相关的预编译逻辑如下:
   *
   * atomic.inline.hpp:
   *    #include "runtime/atomic.hpp"
   *  
   *    // Linux
   *    #ifdef TARGET_OS_ARCH_linux_x86
   *    # include "atomic_linux_x86.inline.hpp"
   *    #endif
   * 
   *    // 省略部分代码
   *  
   *    // Windows
   *    #ifdef TARGET_OS_ARCH_windows_x86
   *    # include "atomic_windows_x86.inline.hpp"
   *    #endif
   *  
   *    // BSD
   *    #ifdef TARGET_OS_ARCH_bsd_x86
   *    # include "atomic_bsd_x86.inline.hpp"
   *    #endif
   *
   * 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现
   */
  return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
                                       (jint)compare_value);
}

上面的分析看起来比较多,不过主流程并不复杂。如果不纠结于代码细节,还是比较容易看懂的。接下来,我会分析 Windows 平台下的 Atomic::cmpxchg 函数。继续往下看吧。

// atomic_windows_x86.inline.hpp
#define LOCK_IF_MP(mp) __asm cmp mp, 0  \
                       __asm je L0      \
                       __asm _emit 0xF0 \
                       __asm L0:
            
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  // 判断是否是多核 CPU
  int mp = os::is_MP();
  __asm {
    // 将参数值放入寄存器中
    mov edx, dest    // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中
    mov ecx, exchange_value
    mov eax, compare_value
  
    // LOCK_IF_MP
    cmp mp, 0
    /*
     * 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,
     * 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
     * 前加 lock 前缀。
     */
    je L0
    /*
     * 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的
     * 原因可以参考知乎的一个回答:
     *     https://www.zhihu.com/question/50878124/answer/123099923
     */
    _emit 0xF0
L0:
    /*
     * 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:
     *   cmpxchg: 即“比较并交换”指令
     *   dword: 全称是 double word,在 x86/x64 体系中,一个
     *          word = 2 byte,dword = 4 byte = 32 bit
     *   ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
     *   [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。
     *          那么 [edx] 表示内存地址为 dest 的内存单元
     *        
     * 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值
     * 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。
     */
    cmpxchg dword ptr [edx], ecx
  }
}

到这里 CAS 的实现过程就讲了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带 lock 前缀的 cmpxchg 指令,即 lock cmpxchg dword ptr [edx], ecx

通过上述的分析,可以发现 AtomicInteger 原子类的内部几乎是基于前面分析过 Unsafe 类中的 CAS 相关操作的方法实现的,这也同时证明 AtomicInteger getAndIncrement 自增操作实现过程,是基于无锁实现的。

CAS的ABA问题及其解决方案

假设这样一种场景,当第一个线程执行CAS(V,E,U)操作。在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图:

Java并发基础:了解无锁CAS就从源码分析

这就是典型的CAS的ABA问题,一般情况这种情况发现的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下原子类

AtomicStampedReference类

AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境

底层实现为: 通过Pair私有内部类存储数据和时间戳, 并构造volatile修饰的私有实例

接着看 java.util.concurrent.atomic.AtomicStampedReference 类的compareAndSet()方法的实现:

private static class Pair<T> {
    final T reference;
    final int stamp;
  
    //最好不要重复的一个数据,决定数据是否能设置成功,时间戳会重复
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

同时对当前数据和当前时间进行比较,只有两者都相等是才会执行casPair()方法,

单从该方法的名称就可知是一个CAS方法,最终调用的还是 Unsafe 类中的 compareAndSwapObject 方法

到这我们就很清晰 AtomicStampedReference 的内部实现思想了,

通过一个键值对 Pair 存储数据和时间戳,在更新时对数据和时间戳进行比较,

只有两者都符合预期才会调用 UnsafecompareAndSwapObject 方法执行数值和时间戳替换,也就避免了ABA的问题。

/**
 * 原子更新带有版本号的引用类型。
 * 该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号。
 * 可以解决使用CAS进行原子更新时,可能出现的ABA问题。
 */
public class AtomicStampedReference<V> {
    //静态内部类Pair将对应的引用类型和版本号stamp作为它的成员
    private static class Pair<T> {
      
        //最好不要重复的一个数据,决定数据是否能设置成功,建议时间戳
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
      
        //根据reference和stamp来生成一个Pair的实例
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
  
    //作为一个整体的pair变量被volatile修饰
    private volatile Pair<V> pair;
 
    //构造方法,参数分别是初始引用变量的值和初始版本号
    public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }
  
    ....
  
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
  
    private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
 
    //获取pair成员的偏移地址
    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
                                  String field, Class<?> klazz) {
        try {
            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
        } catch (NoSuchFieldException e) {
            NoSuchFieldError error = new NoSuchFieldError(field);
            error.initCause(e);
            throw error;
        }
    }
}
/**
 * @param 期望(老的)引用
 * @param       (新的)引用数据
 * @param 期望(老的)标志stamp(时间戳)值
 * @param       (新的)标志stamp(时间戳)值
 * @return 是否成功
 */
public boolean compareAndSet(V expectedReference,V   newReference,int expectedStamp,int newStamp) {
       
    Pair<V> current = pair;
    return
        // 期望(老的)引用 == 当前引用
        expectedReference == current.reference &&
        // 期望(老的)标志stamp(时间戳)值 == 当前标志stamp(时间戳)值
        expectedStamp == current.stamp &&
      
        // (新的)引用数据 == 当前引用数据 并且 (新的)标志stamp(时间戳)值 ==当前标志stamp(时间戳)值
        ((newReference == current.reference && newStamp == current.stamp) ||
          #原子更新值
         casPair(current, Pair.of(newReference, newStamp)));
       
}
 
 //当引用类型的值与期望的一致的时候,原子的更改版本号为新的值。该方法只修改版本号,不修改引用变量的值,成功返回true
public boolean attemptStamp(V expectedReference, int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        (newStamp == current.stamp ||
         casPair(current, Pair.of(expectedReference, newStamp)));
}

/**
 * CAS真正实现方法
 */
private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

期望 Pair cmp(A) == 当前内存存偏移量位置 Pair(V),就更新值 Pair val(B)成功返回true 否则 false

public static void main(String[] args) {
    AtomicStampedReference<Integer> num = new AtomicStampedReference<Integer>(1, 0);

    Integer i = num.getReference();
    int stamped = num.getStamp();

    if (num.compareAndSet(i, i + 1, stamped, stamped + 1)) {
        System.out.println("测试成功");
    }
}

通过以上原子更新方法,可见 AtomicStampedReference就是利用了Unsafe的CAS方法+Volatile关键字对存储实际的引用变量和int的版本号的Pair实例进行更新。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beautiful Code

Beautiful Code

Greg Wilson、Andy Oram / O'Reilly Media / 2007-7-6 / GBP 35.99

In this unique work, leading computer scientists discuss how they found unusual, carefully designed solutions to difficult problems. This book lets the reader look over the shoulder of major coding an......一起来看看 《Beautiful Code》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

MD5 加密
MD5 加密

MD5 加密工具