并发工具类Semaphore信号量源码解析 简介 Semaphore是信号量,在Java并发编程中,信号量控制的是线程并发的数量.它允许n个任务同时访问某个资源,主要是通过信号量大小控制并发数量
源码解析 内部类 内部类图示 由上图可知:Semaphore有三个内部类,三个内部类的关系如下:
内部存在Sync、NonfairSync、FairSync等三个内部类,与ReentrantLock类似,分别实现了公平与非公平方法
Sync源码解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } final void reducePermits (int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits () { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } }
NonfairSync源码解析 1 2 3 4 5 6 7 8 9 10 11 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } }
由上可知:NonfairSync继承了Sync类,非公平方式获取资源,tryAcquireShared还是调用了父类的nonfairTryAcquireShared去实现非公平策略
FairSync类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
由上可知:FairSync继承了Sync类,公平方式获取资源,tryAcquireShared此处没有在父类中重写,直接子类实现了公平策略
属性 1 2 private final Sync sync;
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public Semaphore (int permits) { sync = new NonfairSync(permits); } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
核心方法 acquire 1 2 3 4 5 6 7 8 9 10 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }
假设使用默认的非公平策略,该方法将调用Sync的nonfairTryAcquireShared以非公平方式获取,底层还是调用AQS的doReleaseShared执行的,具体会在示例中按流程分析
release 1 2 3 4 5 6 public void release () { sync.releaseShared(1 ); }
示例分析 使用信号量控制资源的访问,每次最多10个线程访问,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class SemaphoreDemo { public void method (Semaphore semaphore) { try { System.out.println(Thread.currentThread().getName() + " tryAcquire .." ); semaphore.acquire(5 ); System.out.println(Thread.currentThread().getName() + " acquire success .." ); Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(5 ); System.out.println(Thread.currentThread().getName() + " release.." ); } } public static void main (String[] args) { SemaphoreDemo demo = new SemaphoreDemo(); Semaphore semaphore = new Semaphore(12 ); for (int i = 0 ; i < 3 ; i++) { new Thread(new Runnable() { @Override public void run () { demo.method(semaphore); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } }
由上述代码可知:初始化了12个资源的线程,当线程Thread0进行获取的时候它是可以成功获取的,之后的thread1也是可以成功获取,但是thread2已经没有足够的通行数量,会获取失败;它会阻塞等待直到前面0和1进行一次释放,去增加通行数量,满足thread2的需求后就可以获取成功 按照流程逐步分析 1.Thread0的acquire流程如下: 初始的state值为12,当执行acquire后state成为7,此时还是remaining是大于0的,可以通行的 2.Thread1的acquire流程如下: 初始的state值为7,当执行acquire后state成为2,此时还是remaining是大于0的,可以通行的 3.Thread2的acquire流程如下: 初始的state值为2,此时remaining值是小于0的,不能通行,会进入队列进行等待,会直接park 4.thread0的release流程 当进行release时,初始的state值为2,会将thread的state返回增加到其上,release后的state值为7,此时执行doReleaseShared进行unpark,thread2会unpark后再次tryAcquireShared,这次会成功,state值又会从7到2 5.thread2的release 6.thread1的release 此处还有unpark一次,但这次的unpark不会对程序有影响.