AbstractQueuedSynchronizer源码分析 简介 AbstractQueuedSynchronizer(下文均以AQS代替)可以用于构建锁或者其他相关同步装置的基础框架.JDK1.5之前一般使用synchronized关键字来实现线程对共享变量的互斥访问,而JDK1.5之后开发了AQS组件,使用原生java代码实现了synchronized语义. AQS实现了两种模式:公平模式和非公平模式,在我们实现自定义锁的时候(后续会自己实现锁)只需按照要求即可实现对应模式或两种模式都实现,ReentrantLock就是实现了上述两种模式的锁 在使用公平模式时,AQS会生成一个FIFO队列,严格按照线程挂起顺序获得锁使用权.或者说AQS提供了FIFO队列 AQS内部包含了两个内部类,可以先从这个简单的内部类开始解析
Node类解析 Node是用来构成FIFO队列的基本单元,当前线程获取同步状态失败后,AQS会将当前线程以及等待状态等信息构造成节点并加入到同步队列中,同时会阻塞当前线程,当同步状态释放后,会把首节点的线程唤醒,使其再次尝试获取同步状态 它是用来保存获取同步状态失败的线程引用和线程状态、等待状态以及前驱和后继节点的容器,用来实现线程阻塞. 具体代码解析如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
ConditionObject类解析 已在Condiiton源码解析中阐述,如有需要请看ConditionObject类解析连接 AQS的内部类Node和ConditionObject已经解析完毕,接下来解析AQS的成员变量,基础部分就解析完毕,就可以各种AQS相关的操作流程
AQS成员变量 共享资源与同步队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private transient volatile Node head;private transient volatile Node tail;private volatile int state;
由上概述的成员变量可知:AQS维护了volatile int state(代表共享资源,同步器的状态值,如:1-已占有,0-未占有,>1-表示重入的数量)和一个FIFO线程等待队列;AQS使用了state状态位+FIFO排列方式记录了锁的获取和释放 state的访问方式主要是以下三种:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
CAS支持变量 1 2 3 4 5 6 7 8 9 10 11 12 private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;
内部类和成员变量等基础解析完毕,接下来就是AQS操作:
AQS操作解析 由上图可知:AQS实现了两种资源共享方式:独占模式+共享模式,独占对外提供的方法是acquire-release;共享对外提供方法是acquireShared-releaseShared;接下来分别就这两种方式进行阐述
AQS独占方式解析 acquire获取资源 1 2 3 4 5 6 7 8 9 10 11 12 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法 此方法尝试获取资源,AQS中代码如下:
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
AQS中的tryAcquire方法直接抛出异常,并没有实现具体的功能,具体的功能交给具体自定义同步器去实现了. 注意:此处并没有定义成abstract,因为AQS内部实现了独占模式和共享模式,不定义成abstract,自定义同步器可根据自己需求单独实现其中一种或两种都实现 如JDK内部的ReentrantLock已经使用FairSync和NonFairSync这两种方式帮我们实现了tryAcquire FairSync的tryAcquire:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
NonFairSync的tryAcquire:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
addWaiter() 当tryAcquire尝试去获取锁的时候成功则返回true,失败则返回false;此时!tryAcquire就是true就会往下执行acquireQueued(addWaiter()) addWaiter(Node.EXCLUSIVE)的作用:创建”当前线程”的Node节点,且Node中记录”当前线程”对应的锁是”独占锁”类型,并且该节点添加到CLH队列的末尾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
compareAndSetTail也属于CAS函数,也是通过”本地方法”实现的compareAndSetTail(expect, update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话就将队尾设为update
1 2 3 private final boolean compareAndSetTail (Node expect, Node update) { return unsafe.compareAndSwapObject(this , tailOffset, expect, update); }
enq()的作用很简单,如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾,否则直接将node添加到CLH末尾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued() 我们已经将当期线程添加到CLH队列中了.而acquireQueued的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁则返回,否则当前线程休眠直到唤醒并重新获取锁才返回;获取过程中head、tail不断移动将已经完成的队列节点清除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
1.shouldParkAfterFailedAcquire 此方法检测和更新获取失败节点的状态,避免存在节点是取消或被中断了,确保当前节点是真的可以park了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
2.parkAndCheckInterrupt() 真正让线程进入waiting状态,进行park的操作;并进行检查中断的操作
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回”线程被唤醒之后”的中断状态.它会先通过LockSupport.park()阻塞”当前线程”,然后通过Thread.interrupted()返回线程的中断状态
线程被阻塞之后如何唤醒: 第1种情况:unpark()唤醒.”前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程 第2种情况:中断唤醒.其它线程通过interrupt()中断当前线程
补充: LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒;它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁 3.acquireQueue的流程图
acquire流程总结 1.调用tryAcquire()尝试去获取资源,成功则返回 2.失败则addWaiter将线程对应节点放到队列尾部,并标记为独占模式, 3.acquireQueued是不是第一个在等待的,是就再尝试获取,不行就去休息 4.等待过程如果被中断,它不响应,等待获取资源即当前线程在活跃后再去处理
release释放资源 独占模式下资源的释放,也是Lock锁的unlock的具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
1.tryRelease尝试释放 1 2 3 protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); }
同样此处也是直接抛出异常的,具体代码由实现类去实现,可参考ReentrantLock的Sync,已经重写了tryRelease,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
2.unparkSuccessor 若释放锁成功后,遍历节点将需要取消的取消,然后再次尾部开始查找最前边的未取消的节点并进行唤醒,尝试争夺资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
用unpark()唤醒队列最前边的未放弃的节点,就会去执行acquire()去尝试获取资源,它已经队列最前边的未放弃节点了,即它已经在占用节点后面的第一个等待节点了,即使这一次获取失败也会前驱节点设置为SIGNAL,然后下次就会获取成功
AQS共享方式解析 acquireShared(int) 共享模式下获取资源的方法,
1 2 3 4 5 6 7 8 9 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
1.tryAcquireShared 1 2 3 4 5 6 7 8 protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); }
又看到直接抛出异常的方法,大家应该知道此方法肯定是由实现类去具体实现了,这里后续每一个实现类都会进行源码分析,咱后续再讨论; 该方法已经明确了返回值:
失败则返回负值
在共享模式下获取成功,但是没有后续资源可以获取则返回0
在共享模式下获取成功,并且后续获取资源也成功则返回正值,这种情况下,后续等待线程必须检查可用性
2.doAcquireShared 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
与acquireQueued()唯一区别是中断处理操作selfIntertupt()放到doAcquireShared()里面,而前面的独占模式acquireQueued则是放在外面最后处理;即acquireQueued是先拿到资源后判断中断,而共享模式是在拿资源的过程中可以中断以便以后线程可以获取资源,关键就在于拿资源过程中是否可以被抢占 与独占模式相比,共享模式下必须是head节点后面的第一个等待节点去获取,共享资源有剩余的情况下就会去唤醒其他线程节点;而独占模式下,同一时刻只有一个线程在执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
releaseShared 1 2 3 4 5 6 7 8 9 10 11 12 13 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
即先释放掉资源后唤醒后继,与独占模式下release一致,但是独占模式下tryRelease在完全释放掉资源后才会去唤醒其他节点,主要是独占下可重入的问题,而共享模式下则是控制一定量线程并发执行,拥有这些资源的线程释放部分资源就可以唤醒后继节点 tryReleaseShared同样是抛出异常方式,需要实现类去实现的
1.doReleaseShared 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }