可重入读写锁ReentrantReadWriteLock源码解析
简介
在之前的synchronized、自定义的Lock、ReentrantLock都是一种排他锁,而读写锁既是排他锁也是共享锁,对于读锁是共享锁,对于写锁则是排他锁
排他锁:就是多个线程在同一个时刻只允许一个线程访问;其他线程处于自旋状态,并不能访问锁内部代码块
共享锁:在同一个时刻可以允许多个线程访问,共享锁与共享锁之间是可以同时访问的;
读读不互斥、读写互斥(等所有读锁完成写锁才会进行)、写写互斥
ReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作.只要没有writer,读取锁可以由多个reader线程同时保持.写入锁是独占的;ReadWriteLock读取操作通常不会改变共享资源,但执行写入操作时,必须独占方式来获取锁.对于读取操作占多数的情况.ReadWriteLock就能够提供比独占锁更高的并发性.
ReentrantReadWriteLock支持以下功能:
1.支持公平和非公平获取锁方式
2.支持可重入.读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁
3.还允许从写入锁降级为读取锁,但是从读取锁升级到写入锁是不允许的
4.读取锁和写入锁都支持锁获取期间的中断
5.Condition支持.仅写入锁提供了一个Condition实现;读取锁不支持Condiiton,readLock().newCondition()会抛出UnSupportedOperationException
父接口ReadWriteLock
ReadWriteLock也是一个接口,在它里面只定义了两个方法:
1 2 3 4 5 6 7 8 9 10 11
| public interface ReadWriteLock {
Lock readLock();
Lock writeLock(); }
|
一个用来获取读锁,一个用来获取写锁.也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作.ReentrantReadWriteLock实现了ReadWriteLock接口
内部类
内部类总体结构示意图:
Sync
Sync的内部类
Sync类内部有两个内部类,分别是HoldCounter和ThreadLocalHoldCounter,HoldCounter主要是与读锁配套使用,其源码如下:
1 2 3 4 5 6 7 8 9
|
static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
|
说明:HoldCounter内有两个属性:count表示读线程重入的次数,tid则表示当前线程唯一标志
ThreadLocalHoldCounter源码如下:
1 2 3 4 5 6 7 8 9 10
|
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
|
说明:ThreadLocalHoldCounter重写了ThreadLocl的initialValue方法,初始化值是HoldCounter
Sync类属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
|
构造函数
1 2 3 4 5 6 7
|
Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); }
|
主要方法
1.读写锁线程占有的数量
1 2 3 4 5
| static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
|
2.写锁的获取和释放
2.1写锁获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
|
写锁获取流程图
2.2写锁释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
|
写锁释放流程图
3.读锁的获取和释放
3.1读锁的获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; }
return fullTryAcquireShared(current); }
|
读锁获取流程图
3.2读取的释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread();、 if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
|
读锁释放流程图
FairSync
1 2 3 4 5 6 7 8 9
| static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
|
说明:公平锁与非公平锁的区别就是在获取时会进行是否为队列前节点的判断,此处就是公平锁的所需要的是否还有前驱节点判断,有则false,否则返回true
NonfairSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
|
ReadLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
public void lock() { sync.acquireShared(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean tryLock() { return sync.tryReadLock(); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public void unlock() { sync.releaseShared(1); }
public Condition newCondition() { throw new UnsupportedOperationException(); }
public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } }
|
WriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
public void lock() { sync.acquire(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock( ) { return sync.tryWriteLock(); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); }
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }
public int getHoldCount() { return sync.getWriteHoldCount(); } }
|
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private static final long serialVersionUID = -6992448646407690164L;
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
private static final sun.misc.Unsafe UNSAFE;
private static final long TID_OFFSET; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; TID_OFFSET = UNSAFE.objectFieldOffset (tk.getDeclaredField("tid")); } catch (Exception e) { throw new Error(e); } }
|
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public ReentrantReadWriteLock() { this(false); }
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
|
核心方法
ReentrantReadWriteLock的核心方法(只要是Public)的全部都转化为了Sync的操作,而Sync的方法已经在上面内部类全部介绍了
示例分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| public class ReadWrite { private Map<String, Object> map = new HashMap<>(); private ReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock r = rwl.readLock(); private Lock w = rwl.writeLock();
public Object get(String key) { r.lock(); System.out.println(Thread.currentThread().getName()+" 读操作正在执行..."); try { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return map.get(key); } finally { r.unlock(); System.out.println(Thread.currentThread().getName()+" 读操作执行完毕..."); } }
public void put(String key, Object value) { w.lock(); try { System.out.println(Thread.currentThread().getName()+" 写操作正在执行..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); } finally { w.unlock(); System.out.println(Thread.currentThread().getName()+" 写操作执行完毕..."); } } }
public class ReadWriteTest { public static void main(String[] args) { final ReadWrite readWrite = new ReadWrite();
new Thread(new Runnable() { @Override public void run() { readWrite.put("key1", "value1"); } }, "t0").start();
new Thread(new Runnable() { @Override public void run() { System.out.println(readWrite.get("key1")); } }, "t1").start();
new Thread(new Runnable() { @Override public void run() { readWrite.put("key3", "value3"); } }, "t2").start();
} }
|
说明:此处以非公平锁为例,新建了三个线程,分别写、读、写的方式对读写锁进行操作,可能出现的时序:t0去执行写操作,此时t1进来读,会失败进行等待,当t0执行完毕后,t1读线程执行,此时t2进来也会等待t1执行完毕
1.t0线程执行w.lock时,流程如下:
2.t1线程执行r.lock时,由于w写线程在执行,会被阻塞,流程如下:
3.t0线程执行w.unlock时,会在后面进行唤醒t1线程,流程如下:
4.t2线程在执行w.lock时,因读写互斥,会被阻塞,流程如下:
5.t1线程在执行r.unlock时,会唤醒上述的写线程,流程如下:
6.t2线程unlock,其流程类似于上述的写线程unlock,不再赘述