Java并发成神之路-精通JUC并发工具十八般武艺——控制并发流程(七)

什么是控制并发流程

  • 控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作
  • 让线程之间相互配合,来满足业务逻辑
  • 比如让线程A等待线程B执行完毕后再执行等合作策略

有那些控制并发流程的工具类

作用 说明
Semaphore 信号量,可以通过控制”许可证“的数量,来保证线程之间的配合 线程只有在拿到”许可证“后才能继续运行,相比于其他同步器,更灵活
CyclicBarrier 线程会等待,直到足够多线程达到了事先规定的数目,一旦达到触发条件,就可以进行下一步的动作。 适用于线程之间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 Java7加入的
CountDownLatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适的适合交换对象 适用场景:当两个线程工作在同一个类的不同示例上时,用于交换数据
Condition 可以控制线程的”等待“和”唤醒“ 是Object.wait()的升级版

CountDownLatch倒计时门闩

CountDownLatch类的作用

  • 倒数门闩
  • 流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作
  • 开始 -> 进入等待 -> 倒数结束 -> 继续工作

CountDownLatch类的主要方法

  • CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
  • await():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  • countDown():将count值减1,直到为0时,等待的线程会被唤起

CountDownLatch类的主要用法

  • 用法一:一个线程等待多个线程都执行完毕,再继续自己的工作

  • 代码如下所示:假设一个产品需要经过5道质检程序,全部检查之后通过

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(5);
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
    final int no = i+1;
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep((long) (Math.random()*10000));
    System.out.println("No."+no+"完成了检查");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }finally {
    latch.countDown();
    }
    }
    };
    executorService.submit(runnable);
    }
    System.out.println("等待5个人检查完毕~~~~");
    latch.await();
    System.out.println("所有人都检查完毕");
    }
    }
  • 用法二:多个线程等待某一个线程的信号,同时开始执行

  • 代码如下所示:假设有5个人进行跑步比赛,裁判枪响之后,同时开始比赛

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
    final int no = i+1;
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    System.out.println("NO." + no + "准备完毕,等待发令枪");
    try {
    latch.await();
    System.out.println("NO." + no + "开始跑步");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    };
    executorService.submit(runnable);
    }
    Thread.sleep(5000);
    System.out.println("发令抢响,比赛开始");
    latch.countDown();
    }
    }

注意点

  • 扩展用法:多个线程等多个线程完成执行后,再同时执行
  • CountDownLatch是不能够重用的,如果需要重新计数,可以考虑适用CyclicBarrier或创建新的CountDownLatch

Semaphore信号量

  • Semaphore可以用来限制或者管理数量有限的资源的适用情况
  • 信号量的作用是维护一个”许可证“的计数,线程可以”获取“许可证,那信号量剩余的许可证就减一,线程也可以”释放“一个许可证,那么信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

信号量的使用流程

  • 初始化Semaphore并指定许可证的数量
  • 在需要被限制的代码前加acquire()或者acquireUninterruptibly()方法
  • 在任务执行结束后,调用release()来释放许可证

信号量的主要方法介绍

  • new Semaphore(int permits,boolean fair):这里可以设置是否使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证,可以分发给之前等待了最长时间的线程
  • acquire():获取许可证,可以响应中断
  • acquireUninterruptibly():获取许可证,不能响应中断
  • tryAcquire():看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没有关系,我不必陷入阻塞,我可以去做别的事,过一会再来看许可证的空闲情况
  • tryAcquire(timeout):和tryAcquire()方法一样,但是多了一个超时时间,如果没有许可证,会等待一定的时间,然后再退出
  • release():归还许可证

代码演示

使用方法如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(3,true);

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
executorService.submit(new Task());
}
executorService.isShutdown();
}

static class Task implements Runnable{

@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"拿到了许可证");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName()+"释放了许可证-----");
semaphore.release();
}
}
}
}

信号量特殊用法

  • 一次性获取或释放多个许可证
    • 比如任务A会调用很多消耗资源的方法,而任务B调用的不是太消耗资源的方法,假设我们一共有5个许可证,那么我们就可以要求任务A获取5个许可证才能执行,而任务B只需要获取到一个许可证就能执行,这样就避免了两个任务同时运行的情况,我们可以根据自己的需求合理分配资源

注意点

  • 获取和释放的许可证数量必须一致,否则比如每次获取到2个许可证,但是只释放1个,随着时间的推移,到最后许可证数量不够用,会导致城西卡死,(虽然信号量类并不对释放和获取的数量做规定,但是这是我们的编程规范,否则容易出错)
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true更合理(因为使用Semaphore的地方一般都是因为处理太慢,如果使用非公平策略,可能会导致饥饿的情况)
  • 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对线程并无要求,也许A获取了,然后B释放,只要逻辑合理即可
  • 信号量的作用,除了控制临界 区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才能开始工作,那么就线程1acquire(),而线程2完成任务后release(),这样的话就相当于轻量级CountDownLatch

Condition接口(又称条件对象)

作用

  • 当线程1需要等待某个条件的时候,它就会去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态
  • 然后通常会有另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这是JVM就会从被阻塞的线程里找到那些等待该condition的线程,当线程1就会收到可执行的信号的时候,它的线程状态就会变成Runnable可执行状态

signalAll()signal()方法的区别

  • signalAll()会唤起所有正在等待的线程
  • 但是signal()是公平的,只会唤起那个等待时间最长的线程

代码演示

代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public static void main(String[] args) {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}

void method1(){
lock.lock();
try {
System.out.println("条件不满足,开始等待");
condition.await();
System.out.println("添加满足了,继续处理任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
void method2(){
lock.lock();
try {
System.out.println("准备工作完成,开始唤醒其他线程");
condition.signal();
}finally {
lock.unlock();
}
}
}

使用Condition来实现生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ConditionDemo2 {

private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Consumer consumer = conditionDemo2.new Consumer();
Producer producer = conditionDemo2.new Producer();
consumer.start();
producer.start();
}

class Consumer extends Thread{
@Override
public void run() {
consume();
}

private void consume() {
while (true){
lock.lock();
try {
while (queue.size() == 0){
System.out.println("队列为空,等待数据");
notEmpty.await();
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列里剩余: "+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}

class Producer extends Thread{
@Override
public void run() {
produce();
}

private void produce() {
while (true){
lock.lock();
try {
while (queue.size() == queueSize){
System.out.println("队列已满,等待空余");
notFull.await();
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入一个元素,队列剩余空间: "+(queueSize-queue.size())+"-----");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}

注意点

  • 实际上,如果说Lock用来代替synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎都一样
  • await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁
  • 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

CyclicBarrier循环栅栏

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用CyclicBarrierCyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务

代码演示

代码如下所示,CyclicBarrier是可重用的,假设是同时执行了10个线程,也是可以,只不过会5个一起出发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有都到了,大家一起出发吧");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Task(i,cyclicBarrier)).start();
}
}

static class Task implements Runnable {

private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程"+id+"现在前往集合地点");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("线程"+id+"到了集合地点,开始等待其他人");
cyclicBarrier.await();
System.out.println("线程"+id+"出发了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

CyclicBarrierCountDownLatch的区别

  • 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需要等待数字到0,也就是说CountDownLatch用于事件,但是CyclicBarrier是用于线程(CountDownLatch同一个任务用一个线程执行多次,只要计算为0就可以,CyclicBarrier则必须要指定数量的线程才行)
  • 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非创建新的实例,而CyclicBarrier可以重复使用
  • CyclicBarrier在所有线程都到达只会,可以再去运行一个Runnable方法