在多线程编程中,协调线程间的执行顺序是常见需求。Java并发包提供的CountDownLatch和CyclicBarrier是解决这类问题的利器,它们都能实现线程间的同步协作,但适用场景却各有侧重。本文将通过实战案例,详解这两个工具的工作原理、使用方法及适用场景,帮助开发者在实际项目中做出正确选择。

一、CountDownLatch:等待多线程完成

CountDownLatch(倒计时门闩)的核心功能是:让一个或多个线程等待其他线程完成一系列操作后再继续执行。它的工作机制类似田径比赛中的发令枪——裁判(主线程)必须等待所有运动员(工作线程)都准备就绪后,才能鸣枪开始比赛。

1. 基本用法

CountDownLatch的使用分为三个步骤:

  1. 初始化计数器(指定需要等待的线程数)
  2. 工作线程完成任务后调用countDown()递减计数器
  3. 等待线程调用await()阻塞等待,直到计数器变为0
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {// 1. 初始化计数器为3(需要等待3个任务完成)CountDownLatch latch = new CountDownLatch(3);// 2. 创建并启动工作线程ExecutorService executor = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {final int taskId = i + 1;executor.submit(() -> {try {System.out.println("任务" + taskId + "开始执行");// 模拟任务执行(1-3秒随机)Thread.sleep(new Random().nextInt(3000) + 1000);System.out.println("任务" + taskId + "执行完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {// 任务完成,计数器减1latch.countDown();}});}System.out.println("主线程等待所有任务完成...");// 3. 主线程等待计数器变为0latch.await();System.out.println("所有任务已完成,主线程继续执行");executor.shutdown();}
}

运行结果会呈现这样的顺序:

任务1开始执行
任务2开始执行
任务3开始执行
主线程等待所有任务完成...
任务2执行完成
任务1执行完成
任务3执行完成
所有任务已完成,主线程继续执行

2. 关键方法解析

  • CountDownLatch(int count):构造方法,初始化计数器值
  • countDown():计数器值减1(即使减到负数也不会报错)
  • await():阻塞当前线程,直到计数器为0
  • await(long timeout, TimeUnit unit):带超时的等待,超时后不再阻塞

超时等待的使用场景:当某些工作线程可能因异常卡住时,避免主线程无限期等待:

// 等待最多5秒,超时后继续执行
if (latch.await(5, TimeUnit.SECONDS)) {System.out.println("所有任务正常完成");
} else {System.out.println("等待超时,部分任务可能未完成");
}

3. 典型应用场景

CountDownLatch特别适合以下场景:

(1) 并行任务汇总
在数据处理系统中,常需要并行处理多个数据源,再汇总结果:

public class DataAggregationDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);List<String> result = new CopyOnWriteArrayList<>(); // 线程安全集合// 并行处理三个数据源new Thread(() -> {result.add(fetchFromDB());latch.countDown();}).start();new Thread(() -> {result.add(fetchFromAPI());latch.countDown();}).start();new Thread(() -> {result.add(fetchFromFile());latch.countDown();}).start();// 等待所有数据加载完成后汇总latch.await();System.out.println("汇总结果:" + result);}private static String fetchFromDB() {// 模拟数据库查询return "DB数据";}private static String fetchFromAPI() {// 模拟API调用return "API数据";}private static String fetchFromFile() {// 模拟文件读取return "文件数据";}
}

(2) 资源初始化检查
应用启动时,需要确保所有必要服务(缓存、数据库连接池等)初始化完成:

public class ApplicationStartup {private static final CountDownLatch STARTUP_LATCH = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {// 启动缓存服务new Thread(() -> {initializeCache();STARTUP_LATCH.countDown();}).start();// 启动数据库连接池new Thread(() -> {initializeDataSource();STARTUP_LATCH.countDown();}).start();// 等待所有服务就绪STARTUP_LATCH.await();System.out.println("所有服务初始化完成,应用启动成功");}
}

二、CyclicBarrier:多线程同步点

CyclicBarrier(循环屏障)的作用是:让一组线程到达某个同步点后再一起继续执行。它就像登山时的集合点——所有队员都到达营地后,才继续向山顶前进,而且这个过程可以重复进行。

1. 基本用法

CyclicBarrier的使用步骤:

  1. 初始化屏障(指定参与的线程数和到达屏障后的动作)
  2. 每个线程执行到屏障点时调用await()等待其他线程
  3. 当所有线程都到达后,一起突破屏障继续执行
public class CyclicBarrierDemo {public static void main(String[] args) {// 1. 初始化屏障:3个线程参与,到达后执行 Runnable 任务CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("所有线程已到达屏障,开始执行汇总操作");});ExecutorService executor = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {final int threadId = i + 1;executor.submit(() -> {try {System.out.println("线程" + threadId + "开始执行任务");Thread.sleep(new Random().nextInt(2000) + 1000); // 模拟任务System.out.println("线程" + threadId + "到达屏障,等待其他线程");// 2. 到达屏障点,等待其他线程barrier.await();// 3. 所有线程到达后,继续执行后续操作System.out.println("线程" + threadId + "突破屏障,继续执行");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}executor.shutdown();}
}

运行结果会显示:

线程1开始执行任务
线程2开始执行任务
线程3开始执行任务
线程2到达屏障,等待其他线程
线程1到达屏障,等待其他线程
线程3到达屏障,等待其他线程
所有线程已到达屏障,开始执行汇总操作
线程3突破屏障,继续执行
线程1突破屏障,继续执行
线程2突破屏障,继续执行

2. 核心特性解析

CyclicBarrier有两个重要特性,使其区别于CountDownLatch:

(1) 可循环使用
CyclicBarrier的计数器可以通过reset()方法重置,实现多次同步:

public class CyclicBarrierResetDemo {public static void main(String[] args) throws InterruptedException {CyclicBarrier barrier = new CyclicBarrier(2, () -> {System.out.println("===== 本轮任务汇总完成 =====");});Runnable task = () -> {try {for (int i = 0; i < 2; i++) { // 每个线程执行两轮任务System.out.println(Thread.currentThread().getName() + " 完成第" + (i+1)轮任务,到达屏障");barrier.await();Thread.sleep(500); // 模拟屏障后的操作}} catch (Exception e) {e.printStackTrace();}};new Thread(task, "线程A").start();new Thread(task, "线程B").start();}
}

运行结果会呈现两轮完整的同步过程,证明CyclicBarrier可以重复使用。

(2) 屏障断裂机制
当某个线程在等待过程中被中断,或超时,会导致屏障"断裂",所有等待的线程会收到BrokenBarrierException

public class BrokenBarrierDemo {public static void main(String[] args) throws InterruptedException {CyclicBarrier barrier = new CyclicBarrier(2);Thread t1 = new Thread(() -> {try {barrier.await(); // 等待t2} catch (Exception e) {if (e instanceof BrokenBarrierException) {System.out.println("t1: 屏障已断裂");}}});Thread t2 = new Thread(() -> {try {Thread.sleep(1000);// t2在等待前被中断Thread.currentThread().interrupt();barrier.await();} catch (InterruptedException e) {System.out.println("t2: 被中断");} catch (BrokenBarrierException e) {System.out.println("t2: 屏障已断裂");}});t1.start();t2.start();}
}

运行后会发现,t2被中断后,t1会收到屏障断裂异常,这一机制能及时发现线程协作中的异常。

3. 典型应用场景

CyclicBarrier适合需要多线程协同完成多个阶段任务的场景:

(1) 分阶段计算
在科学计算或大数据处理中,常需要分阶段执行,每个阶段都需要所有线程完成当前步骤:

public class PhasedCalculationDemo {public static void main(String[] args) {int threadCount = 4;CyclicBarrier phaseBarrier = new CyclicBarrier(threadCount, () -> {System.out.println("===== 完成一个阶段,准备进入下一阶段 =====");});ExecutorService executor = Executors.newFixedThreadPool(threadCount);for (int i = 0; i < threadCount; i++) {executor.submit(new CalculationTask(phaseBarrier));}executor.shutdown();}static class CalculationTask implements Runnable {private final CyclicBarrier barrier;public CalculationTask(CyclicBarrier barrier) {this.barrier = barrier;}@Overridepublic void run() {try {// 第一阶段计算System.out.println(Thread.currentThread().getName() + " 完成第一阶段计算");barrier.await();// 第二阶段计算System.out.println(Thread.currentThread().getName() + " 完成第二阶段计算");barrier.await();// 第三阶段计算System.out.println(Thread.currentThread().getName() + " 完成第三阶段计算");barrier.await();} catch (Exception e) {e.printStackTrace();}}}
}

(2) 模拟并发测试
在性能测试中,需要让多个线程同时开始执行测试用例,确保测试的公平性:

public class ConcurrentTestDemo {public static void main(String[] args) throws InterruptedException {int clientCount = 100;CyclicBarrier barrier = new CyclicBarrier(clientCount);ExecutorService executor = Executors.newFixedThreadPool(clientCount);// 启动100个线程准备测试for (int i = 0; i < clientCount; i++) {executor.submit(() -> {try {// 线程准备就绪,等待统一开始barrier.await();// 同时开始执行测试逻辑performTest();} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}private static void performTest() {// 测试逻辑}
}

三、CountDownLatch与CyclicBarrier的对比

虽然两者都能实现线程同步,但本质区别显著:

特性

CountDownLatch

CyclicBarrier

核心功能

等待其他线程完成

等待其他线程到达同步点

计数器

只能递减

可重置(递增)

线程角色

分为等待线程和计数线程

所有线程角色相同

重复使用

不可重复使用

可重复使用(reset())

触发动作

无内置动作

可指定屏障触发时的动作

异常处理

等待线程可捕获InterruptedException

会导致屏障断裂,所有线程收到异常

选择建议:

  • 当需要"主线程等待多个工作线程完成"时,用CountDownLatch
  • 当需要"多个线程互相等待,协同完成多阶段任务"时,用CyclicBarrier
  • 一次性同步用CountDownLatch,多次重复同步用CyclicBarrier

四、使用注意事项

  1. 线程安全问题
    多个线程操作共享资源时,需使用线程安全的数据结构(如ConcurrentHashMap),避免数据错乱。
  2. 异常处理
    CyclicBarrier的await()会抛出两个异常:
  • InterruptedException:线程被中断
  • BrokenBarrierException:屏障已断裂(其他线程异常) 实际使用中需妥善处理这两种异常,避免程序崩溃。
  1. 合理设置超时
    无论是CountDownLatch还是CyclicBarrier,在生产环境中建议使用带超时的等待方法,防止线程永久阻塞。
  2. 线程数量匹配
    CyclicBarrier的初始化线程数必须与实际参与的线程数一致,否则会导致永久等待。

五、总结

CountDownLatch和CyclicBarrier是Java并发编程中处理线程协作的重要工具。CountDownLatch通过递减计数器实现"一对多"的等待,适合主线程等待多个工作线程完成的场景;CyclicBarrier通过可重置的计数器实现"多对多"的互相等待,适合多线程协同完成分阶段任务的场景。

理解两者的工作原理和适用场景,能帮助开发者在多线程编程中写出更简洁、高效的代码。实际项目中,这两个工具常与线程池配合使用,构建灵活可靠的并发系统。记住,没有最好的工具,只有最适合当前场景的工具——根据具体需求选择合适的同步方式,才能发挥并发编程的最大威力。