ReentrantReadWriterLock源码(state设计、读写锁、共享锁、独占锁及锁降级)

ReentrantReadWriterLock

读写锁类图(截图来源https://blog.csdn.net/wangbo199308/article/details/108688148)
ReentrantReadWriterLock源码(state设计、读写锁、共享锁、独占锁及锁降级)

state的设计

读写锁将变量state切分成两个部分,高16位表示读,低16位表示写

ReentrantReadWriterLock源码(state设计、读写锁、共享锁、独占锁及锁降级)

源码中将4字节(32位)的int数据类型state,通过SHARED_SHIFT(16)划分读和写;

每次读锁增加的单元,SHARED_UNIT = (1 << SHARED_SHIFT) 也即0x00010000,即每次读锁增加从17位开始加1

读写锁最大数量:MAX_COUNT = (1 << SHARED_SHIFT) - 1,16位最大值

写锁的掩码:EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1, 即求写锁数量,将state和此掩码做与运算,将高16位抹去

计算读锁数量逻辑:c >>> SHARED_SHIFT,取高16位

计算写锁数量逻辑:c & EXCLUSIVE_MASK,将state和此掩码做与运算,将高16位抹去

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //16位划分读和写
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    }
}
读锁

读锁上锁的调用链:ReentrantReadWriteLock$ReadLock#lock() -->AbstractQueuedSynchronizer#acquireShared() -->ReentrantReadWriteLock$Sync#tryAcquireShared()

当前写锁数量为0或独占锁持有者就是当前线程才进行读锁逻辑

读锁数量通过CAS加1

之后逻辑是将读锁线程放入ThreadLocal中,记录各自锁数量

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    public static class ReadLock implements Lock, java.io.Serializable {
        public void lock() {
            sync.acquireShared(1);
        }
    }
}
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
}
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 同时满足写锁数量不为0,且独占锁不是当前线程,走doAcquireShared逻辑
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 取高16位读锁数量
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // ThreadLocal存放锁信息
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
    }
}

在读锁获取锁过程,写锁不为0且占有写锁的不是当前线程,返回-1,走同步器doAcquireShared方法,等待写锁释放;

前置节点是head节点时,尝试获取共享锁

private void doAcquireShared(int arg) {
    // 队列加入的node是共享模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //前置节点是head节点时,尝试获取共享锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
写锁
  1. 读锁不为0,但写锁为0,获取锁失败;读锁不为0,写锁也不为0,但独占锁不是当前线程,获取锁失败
  2. 如果锁数量已到最大,获取失败
  3. 否则获取写锁,更新state
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
    }
}
共享锁和独占锁

读锁是共享锁,当线程1获得读锁时,并不会排斥线程2去获取读锁,而是在ThreadLocal中保存每个锁数量

    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
        
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    }

写锁是独占锁,会调用同步器AbstractQueuedSynchronizer#acquire()方法,默认加入队列的node模式是独占模式

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
锁降级

锁降级就是从写锁降级成为读锁。在当前线程拥有写锁的情况下,再次获取到读锁,随后释放写锁的过程就是锁降级

锁降级示例:

public void processData() {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    readLock.lock();
    if(!update) {
        //必须先释放读锁
        readLock.unlock();
        // 锁降级从写锁获取到开始
        writeLock.lock();
        try{
            if(!update) {
                update = true;
            }
            // 可以获取到读锁,getExclusiveOwnerThread() == current
            readLock.lock();
        } finally {
            writeLock.unlock();
        }
        //锁降级完成,写锁降级为读锁
    }
    try{
        // 使用数据的流程
    } finally {
        readLock.unlock();
    }
}

可降级的源码仍是在读锁tryAcquireShared方法中,getExclusiveOwnerThread() == current,也即当前独占锁owner就是当前线程,可进行读锁逻辑。

protected final int tryAcquireShared(int unused) {
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
}

参考:《Java并发编程的艺术》

发表评论

相关文章