AQS

概述

在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作.
获取操作是一种依赖状态的操作,并且通常会阻塞.当使用锁或信号量时,"获取"操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态.在使用CountDownLatch时,“获取"操作意味着"等待并直到闭锁到达结束状态”,而在使用FutureTask时,“获取"操作意味着"等待并直到任务已经完成”."释放"并不是一个可阻塞的操作,当执行"释放"操作时,所有在请求时被阻塞的线程都会开始执行.

如果一个类想成为状态依赖的类,那么它必须拥有一些状态.AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState以及compareAndSetState等protected类型方法来进行操作.这个整数可以用于表示任意状态.例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的蓄客数量,FutureTask用它来表示人物的状态.在同步器类中还可以自行管理一些额外的状态变量,例如,ReentrantLock保存了锁当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的.

根据同步器不同,获取操作可以是一种独占操作(如ReentrantLock),也可以是一个非独占操作(例如Semaphore和CountDownLatch).一个获取操作包括两部分.

首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败.这种判断是由同步器的语义决定的.例如,对于锁来说,如果它没有被某个线程持有,那么它就能被成功地获取.而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
boolean acquire() throws InterruptedException {
while(当前状态不允许操作){
if (需要阻塞获取请求) {
如果当前线程不在队列中,则将其插入队列;
阻塞当前线程;
}else{
返回失败
}
}
可能更新同步器状态;
如果线程位于队列中,则将其移出队列;
返回成功;
}

void release() {
更新同步器状态;
if (新的状态允许某个被阻塞的线程获取成功) {
解除队列中一个或多个线程的阻塞状态;
}
}

其次,就是更新同步器的状态.获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响.例如,当获取一个锁后,锁的状态将从"未被持有"变成"已被持有",而从Semaphore中获取一个许可后,将把剩余许可的数量减1.然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取封闭锁的操作不会改变闭锁的状态.

如果某个同步器支持独占的获取操作,那需要实现一些保护方法,包括tryAcquire,tryRelease和isHeldExclusively等,
而对于支持共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等方法.AQS中的acquire,acquireShared,release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行.在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState,setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类"获取"或"释放"同步器的操作是否成功.
例如,如果tryRequireShared返回一个负值,则表示获取操作失败;返回零值表示同步器通过独占方式获取,返回正值则表示同步器通过非独占方式被获取.对于tryRelease和tryReleaseShared方法来说,如果释放操作似的所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回true.

为了使支持条件队列的锁(如ReentrantLock)实现起来更简单,AQS还提供了一些机制来构造与同步器相关联的条件变量

一个简单的闭锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OneShotLatch {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int ignored) {
// 如果闭锁是开的(state == 1),那么这个操作成功,否则失败.
return (getState() == 1) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared (int ignored){

// 打开闭锁
setState(1);

// 其他的线程可以获取该闭锁
return true;
}
}

private final Sync sync = new Sync();

public void signal(){
sync.releaseShared(0);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
}

OneShotLatch是一个使用AQS实现的二元闭锁,它包含两个公有方法:await和signal,分别对应获取操作和释放操作.
起初,闭锁是关闭的,任何调用await的线程都将阻塞并直到闭锁被打开.当通过调用signal打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行.

在OneShotLatch中,AQS状态用来表示闭锁状态:关闭(0)或者打开(1)
await方法调用AQS的acquireSharedInterruptibly,然后接着调用OneShotLatch中的tryAcquireShared方法.在tryAcquireShared的实现中必须返回一个值来表示该获取操作是否能执行.如果之前已经打开了闭锁,那么tryAcquireShared将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值.acquireSharedInterruptibly方法在处理失败的方式,是把这个线程放入等待线程队列中,类似地,signal将调用releaseShared,接下来又会调用tryReleaseShared,在tryReleaseShared中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全释放的状态.因而AQS让所有等待中的线程都尝试重新请求该同步器,并且由于tryAcquireShared将返回成功,因此现在的请求操作将成功.

OneShotLatch是一个功能全面的,可用的,性能较好的同步器.当然,它缺少了一些有用的特性,例如限时的请求操作以及检查闭锁的状态.

OneShotLatch也可以通过继承AQS来实现,而不是将一些功能委托给AQS,但这种做法并不合理.这样做将破坏OneShotLatch接口(只有两个方法)的简洁性,并且虽然AQS的公共方法不允许调用者破坏闭锁的状态,但调用者仍可以很容易地误用它们.java.util.concurrent中的所有同步器都没有直接扩展AQS,而是将它们的相应功能委托给私有的AQS子类来实现.

JUC同步器中的AQS

JUC中的许多可阻塞类,如ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch,SynchronousQueue和FutureTask等,都是基于AQS构建的.

ReentrantLock

ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire,tryRelease和isHeldExclusively.ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量.在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获得了锁:在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryAcquire(int ignored){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0) {
if(compareAndSetState(0, 1)) {
owner = current;
return true;
}
}else if(current == owner) {
setState(c + 1);
return true;
}
return false;
}

当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态.如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有.由于状态可能在检查后被立即修改,因此tryAcquire使用conpareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过.如果锁状态已经被持有,并且当且线程是锁的持有这,那么获取锁技术会递增.如果当前线程不是锁的拥有者,那么操作将失败.

Semaphore与CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量.tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败.如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的计数.如果这个操作成功,那么将返回一个值表示获取操作成功.在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那其他等待的线程同样会接触阻塞.

CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值.countDown方式调用release,从而导致计数值递减,并且当计数值为0时,接触所有等待线程的阻塞.

FutureTask

FutureTask看上去不像一个同步器,但Future.get的语义非常类似闭锁的语义:如果发生了某个时间(FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生.

ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:一个读锁一个写锁
但在基于AQS实现的ReentrantLock中,单个AQS子类将同时管理读取加锁和写入加锁.ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数.在读取锁上的操作将使用共享的获取方法与释放方法,在写锁上操作将使用独占的获取方法与释放方法.

AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问.在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁.