ReentrantReadWriteLock源码分析

栏目: 编程工具 · 发布时间: 2年前

来源: juejin.im

内容简介:ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下

本文转载自:https://juejin.im/post/5d131af4f265da1ba9158d6d,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有。

一、简介

ReentrantLock是排他锁,下一篇会进行介绍,排他锁在同一时刻仅有一个线程可以进行访问, 实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何读-读、读-写、写-写操作都不能同时进行, 在一定程度上降低了吞吐量。然而并发读之间不存在数据竞争问题,如果在读多写少场景下,允许并发读同时进行,会进一步提升性能。因此引入了ReentrantReadWriteLock, 顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。 读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能, 读写锁主要应用于读多写少的场景。先知道ReentrantReadWriteLock的这几点注意事项①支持公平和非公平(如果使用无参的构造函数)的获取锁②读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占 ③条件变量(Condition)只能在写锁使用,读锁不允许获取条件变量,否则将得到④如果一个线程UnsupportedOperationException异常⑤支持可重入,如已获取读锁的线程,能够再次获取读锁⑥如已获取写锁的线程,之后能够再次获取写锁,同时也可以再获取读锁。⑦ReentrantReadWriteLock可能会导致获取写锁的线程饥饿,可以看下我的另一篇StampedLock的分析。StampedLock可以解决获取写锁的线程饥饿。ReentrantReadWriteLock内部类有读锁(ReadLock)和写锁(WriteLock),内部都是调用Sync类进行读写锁的获取和释放,Sync有两个子类FairSync和NonFairSync,下面会进行详细的介绍。

二、类关系

//读写锁的抽象类
public interface ReadWriteLock {
    //获取读锁抽象方法
    Lock readLock();
    //获取写锁抽象方法
    Lock writeLock();
}复制代码

三、属性

//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//AQS的实现类,在下面内部类中会进行介绍,AQS不清楚的,可以看我的AQS源代码分析https://juejin.im/post/5d0b3b55f265da1bc23f7dca
final Sync sync;
//UnSafe不清楚的可以看下我的分享https://juejin.im/user/5bd8718051882528382d8728/shares
private static final sun.misc.Unsafe UNSAFE;
//线程Thread的属性tid的相对偏移量,定义成静态是因为tid相对每个Thread实例的相对偏移量都是一样
private static final long TID_OFFSET;
static {
    try {
        //获取UnSafe实例,如果也想在自己的代码中使用UnSafe,也可以看下我的分享,里面有介绍到如何获取到UnSafe实例
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        //获取Thread类的Class
        Class<?> tk = Thread.class;
        //使用UnSafe获取Thread类的属性tid相对偏移量 
        TID_OFFSET = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("tid"));
    } catch (Exception e) {
        //抛出错误
        throw new Error(e);
    }
}复制代码

四、构造函数

//无参构造函数,创建非公平锁
public ReentrantReadWriteLock() {
    //调用有参的构造函数 
    this(false);
}

/**
 * 传入fair参数创建ReentrantReadWriteLock实例
 *
 * @param fair true创建公平锁,false创建非公平锁
 */
public ReentrantReadWriteLock(boolean fair) {
    //true创建公平锁,false创建非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
    //创建读锁,ReadLock会在下面内部类中进行介绍 
    readerLock = new ReadLock(this);
    //创建写锁,WriteLock会在下面内部类中进行介绍
    writerLock = new WriteLock(this);
}复制代码

五、内部类

  1. sync内部类
    //ReadLock和WriteLock的获取和释放都是使用Sync进行获取和释放,AQS的属性state即表示锁的状态 
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;
            //分别存放读锁和写锁被获取的位数,即AQS的属性state(类型int)高16位存放读锁被获取个数,低16位存放写锁被获取个数
            static final int SHARED_SHIFT   = 16;
            //由于AQS属性state的高16位存放读锁被获取的个数,为此每次加读锁,增加的锁个数最小单元
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            //读锁和写锁的锁被获取个数都不能大于的最大值
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            //低16位存放写锁个数的标记位,在下面exclusiveCount方法使用到,和锁状态,即AQS中的属性state进行&操作,获取写锁的个数
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
            
            //获取读锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,获取读锁被获取的个数,由于AQS属性state,即锁的状态,高16位才是存放读锁被获取的个数,为此要获取到读锁被获取的个数需将其state做右移16位,才获取到读锁被获取的个数  
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            
            //获取写锁被获取的个数,传入AQS的属性state值,即ReentrantReadWriteLock的锁状态,由于AQS属性state,即锁的状态,低16位才是存放写锁被获取的个数,为此要获取到写锁被获取的个数需将其&上EXCLUSIVE_MASK即低16位的标识值,才获取到写锁被获取的个数  
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
            
            //记录线程获取读锁的次数count,和获取读锁的当前线程tid
            static final class HoldCounter {
                //获取读锁的次数 
                int count = 0;
                //存放获取读锁的当前线程tid
                final long tid = getThreadId(Thread.currentThread());
            }
             
            //ThreadLocal实现的类,只是重写了ThreadLocal的initialValue方法,在调用get方法时,返回默认值,
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                //重写ThreadLocal的initialValue方法
                public HoldCounter initialValue() {
                    //返回当前线程对应的HoldCounter实例
                    return new HoldCounter();
                }
            }
            
             
            private transient ThreadLocalHoldCounter readHolds;
            
            //用来缓存最近一次获取读锁的线程的HoldCounter对象
            private transient HoldCounter cachedHoldCounter;
            
            //表示第一个获取读锁的线程
            private transient Thread firstReader = null;
            //表示第一个获取读锁的线程获取
            private transient int firstReaderHoldCount;
            
            //Sync构造函数,在子类FairSync或者NonFairSync被初始化时,会调用父类Sync的构造函数
            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); 
            }
             
            //在获取读锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到
            abstract boolean readerShouldBlock();
            
            //在获取写锁时,当前线程是否需要被阻塞,模板方法,NonFairSync和FairSync分别进行实现,下面会讲到
            abstract boolean writerShouldBlock();
            
            //根据传入进来的要释放写锁被获取的个数尝试释放写锁
            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
            protected final boolean tryAcquire(int acquires) {
                
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
            protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
            private IllegalMonitorStateException unmatchedUnlockException() {
                return new IllegalMonitorStateException(
                    "attempt to unlock read lock, not locked by current thread");
            }
    
            protected final int tryAcquireShared(int unused) {
                
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
            
            final int fullTryAcquireShared(Thread current) {
               
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        
                    } else if (readerShouldBlock()) {
                        if (firstReader == current) {
                            
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    
            final boolean tryWriteLock() {
                Thread current = Thread.currentThread();
                int c = getState();
                if (c != 0) {
                    int w = exclusiveCount(c);
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                }
                if (!compareAndSetState(c, c + 1))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
            final boolean tryReadLock() {
                Thread current = Thread.currentThread();
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return false;
                    int r = sharedCount(c);
                    if (r == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (r == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            HoldCounter rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                cachedHoldCounter = rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                        }
                        return true;
                    }
                }
            }
    
            protected final boolean isHeldExclusively() {
               
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            final Thread getOwner() {
               
                return ((exclusiveCount(getState()) == 0) ?
                        null :
                        getExclusiveOwnerThread());
            }
    
            final int getReadLockCount() {
                return sharedCount(getState());
            }
    
            final boolean isWriteLocked() {
                return exclusiveCount(getState()) != 0;
            }
    
            final int getWriteHoldCount() {
                return isHeldExclusively() ? exclusiveCount(getState()) : 0;
            }
    
            final int getReadHoldCount() {
                if (getReadLockCount() == 0)
                    return 0;
    
                Thread current = Thread.currentThread();
                if (firstReader == current)
                    return firstReaderHoldCount;
    
                HoldCounter rh = cachedHoldCounter;
                if (rh != null && rh.tid == getThreadId(current))
                    return rh.count;
    
                int count = readHolds.get().count;
                if (count == 0) readHolds.remove();
                return count;
            }
    
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                readHolds = new ThreadLocalHoldCounter();
                setState(0); // reset to unlocked state
            }
    
            final int getCount() { return getState(); }
        }复制代码
  2. NonFairSync内部类
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            final boolean writerShouldBlock() {
                return false; 
            }
            final boolean readerShouldBlock() {
                
                return apparentlyFirstQueuedIsExclusive();
            }
    }复制代码
  3. FairSync内部类
    static final class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
            }
            final boolean readerShouldBlock() {
                return hasQueuedPredecessors();
            }
    }复制代码
  4. ReadLock内部类
    public static class ReadLock implements Lock, java.io.Serializable {
            private static final long serialVersionUID = -5992448646407690164L;
            private final Sync sync;
    
            protected ReadLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            public void lock() {
                sync.acquireShared(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireSharedInterruptibly(1);
            }
    
            public boolean tryLock() {
                return sync.tryReadLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
            }
    
            public void unlock() {
                sync.releaseShared(1);
            }
    
            public Condition newCondition() {
                throw new UnsupportedOperationException();
            }
    
            public String toString() {
                int r = sync.getReadLockCount();
                return super.toString() +
                    "[Read locks = " + r + "]";
            }
    }复制代码
  5. WriteLock内部类
    public static class WriteLock implements Lock, java.io.Serializable {
            private static final long serialVersionUID = -4992448646407690164L;
            private final Sync sync;
    
            protected WriteLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            public void lock() {
                sync.acquire(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireInterruptibly(1);
            }
    
            public boolean tryLock( ) {
                return sync.tryWriteLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireNanos(1, unit.toNanos(timeout));
            }
    
            public void unlock() {
                sync.release(1);
            }
    
            public Condition newCondition() {
                return sync.newCondition();
            }
    
            public String toString() {
                Thread o = sync.getOwner();
                return super.toString() + ((o == null) ?
                                           "[Unlocked]" :
                                           "[Locked by thread " + o.getName() + "]");
            }
    
            public boolean isHeldByCurrentThread() {
                return sync.isHeldExclusively();
            }
    
            public int getHoldCount() {
                return sync.getWriteHoldCount();
            }
    }复制代码

六、独占锁

  1. 独占锁的获取
    复制代码
  2. 独占锁的释放
    复制代码

七、共享锁

  1. 共享锁的获取
    复制代码
  2. 共享锁的释放
    复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

关注码农网公众号

关注我们,获取更多IT资讯^_^


查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

金融计算与建模

金融计算与建模

朱世武 / 清华大学 / 2007-8 / 40.00元

《金融计算与建模:理论、算法与SAS程序》全书分为4大模块:1-9章为金融学基础指标计算模块;10-12章为股票定价模块;13-18章为风险度量模块;19-23章为固定收益定价模块。每一模块的内容一般由三部分组成:金融理论与模型、算法实现及计算程序。其中,算法实现与计算程序全部以中国金融市场的实际问题为应用背景而设计。《金融计算与建模:理论、算法与SAS程序》不仅展现了应用SAS软件的技术,同时也......一起来看看 《金融计算与建模》 这本书的介绍吧!

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具