同步工具类

闭锁

闭锁(Latch)是一种同步工具类,可以延迟线程的进度知道到达中止状态.

闭锁的作用相当于一扇门:
在闭锁到达结束状态之前,这扇门是一直关闭的,并且没有任何线程能够通过.
在闭锁到达结束状态时,这扇门会打开并允许所有的线程通过.
当闭锁到达结束状态后,将不再改变状态,这扇门永远保持打开状态.

闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行.二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要R的操作都必须现在这个闭锁上等待.
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动.每个服务都有一个相关的二元闭锁.当启动服务S时,将首先在S依赖的其他服务上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行
  • 等待知道某个操作的所有参与者都就绪再继续执行.这种情况下,当所有参与者都准备就绪时,闭锁将到达结束状态

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以是一个或多个线程等待一组事件发生.闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量.countDown方法递减计数器,表示有一个时间已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生.如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.concurrent.CountDownLatch;

class Tmp {
static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for (int i = 0; i < nThreads; i++) {
new Thread() {
@Override
public void run() {
try {

// 等待startGate信号
startGate.await();

try {
task.run();
} finally {

// endState信号减一
endGate.countDown();

}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
long start = System.nanoTime();

// startGate信号减一,线程继续执行
startGate.countDown();

// 等待线程执行完
endGate.await();

long end = System.nanoTime();
return end - start;
}

public static void main(String[] args) {
long l = 0;
try {
l = timeTasks(100, () -> {
System.out.print(123);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println(l);
}
}

FutureTask

FutureTask也可用作闭锁.FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下三种状态:等待运行(Waiting to run), 正在运行(running)和运行完成(conpleted).
执行完成表示计算的所有可能结束方式,包括正常完成,由于取消结束和由于异常结束等.当FurureTask进入完成状态后,它会永远停止在这个状态.

Future.get的行为取决于任务的状态.
如果任务已经完成,那么get会立即返回结果,否则将get阻塞直到任务进入完成状态,然后返回结果或抛出异常.FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布.

信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量.计数信号还可以用来实现某种资源池,或者对容器市价边界.

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定,在执行操作时可以首先获取许可(只要还有剩余的许可),并在使用以后释放许可.如果没有许可,那么acquire将阻塞直到有许可(或者终端后者操作超时).release方法将返回一个许可信号量.
计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore,二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁.

Semaphore可以用于实现资源池,例如数据库连接池.我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时接触阻塞.如果将Semaphore的计数值初始化为池的大小,并在从翅中获取一个资源志气你首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedHashSet<T> {
private Set<T> set;
private Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {

// 获得一个许可
sem.acquire();

boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {

// 如果没添加成功,则当场释放一个许可
sem.release();

}
}
}

public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
// 如果删除成功,则释放一个许可
sem.release();
}
return wasRemoved;
}

public static void main(String[] args) {
var a = new BoundedHashSet<Integer>(4);
try {
a.add(1);
a.add(2);
a.add(3);
a.add(4);

// add(5)这一步会阻塞
a.add(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

栅栏

我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束.闭锁是一次性对象,一旦进入中止状态,就不能被重置.

栅栏(Barrier)类似于闭锁,他能阻塞一组线程直到某个事件发生.
栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,栅栏用于等待其他线程.
栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:所有人在6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事情

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:
这种算法通常将一个问题拆分成一系列相互独立的子问题.当线程达到栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置.如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用.如果对await调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException.如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作.CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会执行它,但在阻塞线程被释放之前是不能执行的.

在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以并行执行,但必须等到该不走中的所有计算都执行完毕才能进入下一个步骤.