判断线程/任务是否全部执行完成

判断线程/任务是否全部执行完成

需求

从网络上下载多个文件(可能会有很多)。最后将所有文件打包为一个压缩包。

思路

  1. 考虑可能有很多文件,所以采用多线程,一个线程去下载一个资源。
  2. 最后再将所有文件夹进行打包。

问题

打包后发现压缩包类的文件,跟实际的下载下来的文件数量不一致。很奇怪??

排查发现,在进行打包的时候实际上文件夹内的文件数量并不是最终的数量。由此可以判断出我在进行打包的时候我的下载线程还有未执行完成的。

解决方案

方案1-CountDownLatch(推荐)

CountDownLatch:用于实现线程间的协同。它允许一个或多个线程等待其他线程完成操作。

他是在初始化时指定一个计数值,每个线程完成任务时将这个计数值减一,当计数值达到零时,等待的线程被唤醒继续执行。

重要方法
  • CountDownLatch(int size)

    • 构造函数
  • void countDown()

    • 每次一个线程完成任务时,调用此方法将计数值减一
  • void await() throws InterruptedException

    • 当前线程调用此方法时会被阻塞,直到计数值减为零。如果在等待过程中被中断,将抛出 InterruptedException
  • boolean await(long timeout, TimeUnit unit) throws

    • InterruptedException

      类似于 await() 方法,但允许设置等待的最大时间。如果在指定时间内计数值减为零,返回 true;如果超时,返回 false

样例
@Test
public void testCountDownLatch(){
    int count = 5;
    CountDownLatch countDownLatch = new CountDownLatch(count);
    for (int l = 0; l < count; l++) {
        new Thread(()-> {
            log.info("执行完成");
            // 每个线程执行完成后执行countDown 使得countDownLatch可以-1
            countDownLatch.countDown();
        }).start();
    }
    try {
        // for 循环结束后可能其中一部分线程并没有结束,这里使用await,使得我们的主线程阻塞住,直到所有的线程执行完成也就是countDownLatch 为0的时候。
        // await(long timeout, TimeUnit unit) 这里其实也可以设置一个超时时间来避免长时间没有执行完成的问题。
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
缺点

CountDownLatch 他是一次性的,不能重复使用

方案2-CyclicBarrier

CyclicBarrier:允许一组线程在达到某个屏障点时相互等待,并在所有线程都到达时继续执行。

它是一个可以重复使用的循环屏障,一次用完后,可以通过reset来重置屏障点的数量。

重要方法
  • CyclicBarrier(int parties)

    • 构造方法,创建一个 CyclicBarrier 实例,指定屏障点的数量(parties)。

    • parties 表示需要等待的线程数量,即当调用 await() 方法的次数达到 parties 时,所有线程将继续执行。

  • void await()

    • 当线程到达屏障点时调用此方法,告诉屏障它已经到达,然后等待其他线程。
    • 如果当前调用是最后一个到达屏障点的线程,则屏障打开,所有等待的线程被释放,可以继续执行。
  • boolean await(long timeout, TimeUnit unit)

    • 类似于 await() 方法,但是允许设置超时时间,超过指定的时间后如果仍然有线程未到达屏障点,将抛出 TimeoutException
  • void reset()

    • 重置屏障,将屏障恢复到初始状态。即使有一些线程正在等待,它们也会被唤醒并抛出 BrokenBarrierException,表示屏障已经被重置。
样例
@Test
public void testCyclicBarrier() throws BrokenBarrierException, InterruptedException {
    int count = 5;
    // 这里可以设置一个回调,当所有任务执行完成后再执行。
    CyclicBarrier cyclicBarrier = new CyclicBarrier(count,()->{
        log.info("所有线程执行完成");
    });
    for (int l = 0; l < count; l++) {
        new Thread(()-> {
            log.info("开始等待");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            log.info("执行完成");
        }).start();
    }
    // 如果使用junit来进行测试的话,这里其实会直接退出的。可以使用对主线程进行屏障。从而导致junit单元测试不会退出。
    cyclicBarrier.await();
}

image-20240122161625173

它是可以使线程都先阻塞住,然后等屏蔽点到达一定的数据的时候就解除所有屏蔽点,使得可以继续执行。

缺点

使用起来不如CountDownLatch方便,而且上面那个样例中在主线程使用cyclicBarrier.await()其实就会导致屏蔽点+1所有最后只输出了4个执行完成。

这里太懒了,我们没有细究(感觉使用不多)。??

方案3-CompletableFuture(推荐)

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,用于简化异步操作的处理和组合。

这里我们使用一个叫CompletableFuture.allOf(CompletableFuture...)的方法。

重要方法
  • CompletableFuture.allOf(CompletableFuture...)
    • CompletableFuture 提供的一个方法,他是一个可变参数。用于等待传入的所有 CompletableFuture 完成,当所有的CompletableFuture都完成时,新的CompletableFuture 也完成。
  • join()
    • CompletableFuture 类中的一个方法,用于等待异步操作的完成并获取结果。它类似于 get() 方法,但是不会抛出 checked exception。在使用 join() 时,如果异步操作抛出了异常,它会包装成 CompletionException
样例
@Test
public void testCompletableFuture(){
    // 使用一个集合来记录所有的异步任务。
    List<CompletableFuture<?>> list = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        list.add(CompletableFuture.runAsync(()-> log.info("执行完成")));
    }
    // 最后创建一个新的来记录所有的异步任务是否已经执行完成。
    CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
}
注意

这里所有的CompletableFuture都没有指定线程池,所有他这里是使用的forkJoinPool。最好不要使用内置的这个线程池,可以自定义一个,然后在创建CompletableFuture的时候进行指定。

总结

注意

这里所有的CompletableFuture都没有指定线程池,所有他这里是使用的forkJoinPool。最好不要使用内置的这个线程池,可以自定义一个,然后在创建CompletableFuture的时候进行指定。

总结

我感觉方案3好用一些。但是如果不放心使用异步任务的话,直接使用方案1即可。都可以实现功能。