[JAVA] Java CountDownLatch的源码硬核解析

2027 0
王子 2022-11-9 09:47:21 | 显示全部楼层 |阅读模式
目录

    前言介绍和使用
      例子概述
    实现思路源码解析
      类结构图await() 实现原理countDown()实现原理



前言

对于并发执行,Java中的CountDownLatch是一个重要的类,简单理解, CountDownLatch中count down是倒数的意思,latch则是“门闩”的含义。在数量倒数到0的时候,打开“门闩”, 一起走,否则都等待在“门闩”的地方。
为了更好的理解CountDownLatch这个类,本文通过例子和源码带领大家深入解析这个类的原理。

介绍和使用


例子

我们先通过一个例子快速理解下CountDownLatch的妙处。
最近LOL S12赛如火如荼举行,比如我们玩王者荣耀的时候,10个万玩家登入游戏,每个玩家的网速可能不一样,只有每个人进度条走完,才会一起来到王者峡谷,网速快的要等网速慢的。我们通过例子模拟下这个过程。
  1. @Slf4j(topic = "a.CountDownLatchTest")
  2. public class CountDownLatchTest {
  3.     public static void main(String[] args) throws InterruptedException {
  4.         // 创建一个倒时器,默认10个数量
  5.         CountDownLatch latch = new CountDownLatch(10);
  6.         ExecutorService service = Executors.newFixedThreadPool(10);
  7.         // 设置进度数据
  8.         String[] personProcess = new String[10];
  9.         Random random = new Random();
  10.         for (int i = 0; i < 10; i++) {
  11.             int finalJ = i;
  12.             service.submit(() -> {
  13.                 // 模拟10个人的进度条
  14.                 for (int j = 0; j <= 100; j++) {
  15.                     // 模拟网速快慢,随机生成
  16.                     try {
  17.                         Thread.sleep(random.nextInt(100));
  18.                     } catch (InterruptedException e) {
  19.                         e.printStackTrace();
  20.                     }
  21.                     // 设置进度数据
  22.                     personProcess[finalJ] = j + "%";
  23.                    log.info("{}", Arrays.toString(personProcess));
  24.                 }
  25.                 // 运行结束,倒时器 - 1
  26.                 latch.countDown();
  27.             });
  28.         }
  29.         // 打开"阀门"
  30.         latch.await();
  31.        log.info("王者峡谷到了");
  32.         service.shutdown();
  33.     }
  34. }
复制代码
运行结果:



概述

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。
构造器:
public CountDownLatch(int count):设置倒数器需要倒数的数量
常用API:
    public void await() throws InterruptedException:调用await()方法的线程会被挂起,等待直到count值为0再继续执行。public boolean await(long timeout, TimeUnit unit) throws InterruptedException:同await(),若等待timeout时长后,count值还是没有变为0,不再等待,继续执行。时间单位如下常用的毫秒、天、小时、微秒、分钟、纳秒、秒。public void countDown(): count值递减1public long getCount():获取当前count值
常见使用场景:
一个程序中有N个任务在执行,我们可以创建值为N的CountDownLatch,当每个任务完成后,调用一下countDown()方法进行递减count值,再在主线程中使用await()方法等待任务执行完成,主线程继续执行。

实现思路

通过前面的例子和介绍我们知道CountDownLatch的大致使用流程:
    创建CountDownLatch并设置计数器值。启动多线程并且调用CountDownLatch实例的countDown()方法。主线程调用 await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,count值为0,停止阻塞,主线程继续执行。
不妨我们先思考下,它是怎么实现的呢?我们可以问自己几个问题?
    如何做到可以让主线程阻塞等待在那里?是不是可以调用LockSupport.park()方法进行阻塞。那么什么时候该阻塞呢?我们需要有个变量,比如state, 如果state大于0,就阻塞主线程。那么什么时候该唤醒呢,又如何唤醒呢?如果任务执行完成后,我们让state 减去1,也就是调用countDown()方法,如果发现state是0,那么就调用LockSupport.unpark()唤醒此前阻塞的地方,继续执行。
是不是很熟悉,这就是我们的AQS共享模式的实现原理啊,不了解AQS共享模式的可以参考本篇文章:深入浅出理解Java并发AQS的共享锁模式
我们把思路理清楚后,直接看CountDownLatch的源码。

源码解析


类结构图



以上是CountDownLatch的类结构图,
    Sync是CountDownLatch的内部类,被成员变量sync持有。Sync继承了AbstractQueuedSynchronizer,也就是我们大名鼎鼎的AQS。

await() 实现原理

1.线程调用 await()会阻塞等待其他线程完成任务
  1. // CountDownLatch#await
  2. public void await() throws InterruptedException {
  3.     // 调用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法
  4.     sync.acquireSharedInterruptibly(1);
  5. }
  6. // AbstractQueuedSynchronizer#acquireSharedInterruptibly
  7. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  8.     // 判断线程是否被打断,抛出打断异常
  9.     if (Thread.interrupted())
  10.         throw new InterruptedException();
  11.     // 尝试获取共享锁
  12.     // 条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源
  13.     // 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用
  14.     if (tryAcquireShared(arg) < 0)
  15.         // 阻塞当前线程的逻辑
  16.         doAcquireSharedInterruptibly(arg);
  17. }
  18. // CountDownLatch.Sync#tryAcquireShared
  19. protected int tryAcquireShared(int acquires) {
  20.     return (getState() == 0) ? 1 : -1;
  21. }
复制代码
2.doAcquireSharedInterruptibly()方法是实现线程阻塞的核心逻辑
  1. // AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
  2. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  3.     // 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中
  4.     final Node node = addWaiter(Node.SHARED);
  5.     boolean failed = true;
  6.     try {
  7.         for (;;) {
  8.             // 获取当前节点的前驱节点
  9.             final Node p = node.predecessor();
  10.             // 前驱节点时头节点就可以尝试获取锁
  11.             if (p == head) {
  12.                 // 再次尝试获取锁,获取成功返回 1
  13.                 int r = tryAcquireShared(arg);
  14.                 if (r >= 0) {
  15.                     // 获取锁成功,设置当前节点为 head 节点,并且向后传播
  16.                     setHeadAndPropagate(node, r);
  17.                     p.next = null; // help GC
  18.                     failed = false;
  19.                     return;
  20.                 }
  21.             }
  22.             // 阻塞在这里
  23.             if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  24.                 throw new InterruptedException();
  25.         }
  26.     } finally {
  27.         // 阻塞线程被中断后抛出异常,进入取消节点的逻辑
  28.         if (failed)
  29.             cancelAcquire(node);
  30.     }
  31. }
复制代码
3.parkAndCheckInterrupt()方法中会进行阻塞操作
  1. private final boolean parkAndCheckInterrupt() {
  2.             // 阻塞线程
  3.         LockSupport.park(this);
  4.         return Thread.interrupted();
  5.     }
复制代码
countDown()实现原理

1.任务结束调用 countDown() 完成计数器减一(释放锁)的操作
  1. public void countDown() {
  2.     sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5.     // 尝试释放共享锁
  6.     if (tryReleaseShared(arg)) {
  7.         // 释放锁成功开始唤醒阻塞节点
  8.         doReleaseShared();
  9.         return true;
  10.     }
  11.     return false;
  12. }
复制代码
2.调用tryReleaseShared()方法尝试释放锁,true表示state等于0,去唤醒阻塞线程。
  1. protected boolean tryReleaseShared(int releases) {
  2.     for (;;) {
  3.         int c = getState();
  4.         // 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false
  5.         if (c == 0)
  6.             return false;
  7.         // 计数器减一
  8.         int nextc = c-1;
  9.         if (compareAndSetState(c, nextc))
  10.             // 计数器为 0 时返回 true
  11.             return nextc == 0;
  12.     }
  13. }
复制代码
3.调用doReleaseShared()唤醒阻塞的节点
  1. private void doReleaseShared() {
  2.     for (;;) {
  3.         Node h = head;
  4.         // 判断队列是否是空队列
  5.         if (h != null && h != tail) {
  6.             int ws = h.waitStatus;
  7.             // 头节点的状态为 signal,说明后继节点没有被唤醒过
  8.             if (ws == Node.SIGNAL) {
  9.                 // cas 设置头节点的状态为 0,设置失败继续自旋
  10.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  11.                     continue;
  12.                 // 唤醒后继节点
  13.                 unparkSuccessor(h);
  14.             }
  15.             // 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性
  16.             else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  17.                 continue;
  18.         }
  19.         // 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,
  20.         // 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
  21.         if (h == head)
  22.             break;
  23.     }
  24. }
复制代码
以上就是Java CountDownLatch的源码硬核解析的详细内容,更多关于Java CountDownLatch的资料请关注中国红客联盟其它相关文章!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2025 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行