当前位置 博文首页 > Admol:源码分析:CyclicBarrier 之循环栅栏

    Admol:源码分析:CyclicBarrier 之循环栅栏

    作者:Admol 时间:2021-02-19 10:31

    简介

    CyclicBarrier 是一个同步辅助工具,允许一组线程全部等待彼此达到共同屏障点,且等待的线程被释放后还可以重新使用,所以叫做Cyclic(循环的)。

    应用场景

    比如出去旅行时,导游需要等待所有的客人到齐后,导游才会给大家讲解注意事项等

    官方示例

    在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

    public class Solver {
        AtomicInteger sum = new AtomicInteger(0);
        // 自己新增的一个标识,true代表所有的计算完成了
        volatile boolean done = false;
        final int N;
        final int[][] data;
        final CyclicBarrier barrier;
    
        class Worker implements Runnable {
            int myRow;
            Worker(int row) {
                myRow = row;
            }
            @Override
            public void run() {
                while (!done()) {
                    int rowSum = Arrays.stream(data[myRow]).sum(); // 计算行的和
                    System.out.println("processRow(myRow):" + rowSum);
                    sum.addAndGet(rowSum);
                    try {
                        barrier.await();
                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
                }
            }
        }
    
        private boolean done(){
            return done;
        }
    
        public Solver(int[][] matrix) throws InterruptedException{
            data = matrix;
            N = matrix.length;
            Runnable barrierAction = () -> {
                System.out.println("mergeRows(...):"+sum.get()); // 输出二维数组的总和
                done = true;
            };
            barrier = new CyclicBarrier(N, barrierAction);
    
            List<Thread> threads = new ArrayList<Thread>(N);
            for (int i = 0; i < N; i++) {
                Thread thread = new Thread(new Worker(i));
                threads.add(thread);
                thread.start();
            }
    
            // wait until done
            for (Thread thread : threads){
                thread.join();
            }
        }
    
        public static void main(String[] args) throws InterruptedException{
            int[][] matrix = {{1,2,3},{4,5,6}};
            Solver solver = new Solver(matrix);
        }
    }
    

    源码分析

    主要的属性

    
    /** 防护栅栏入口的锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 等待直到跳闸的条件 */
    private final Condition trip = lock.newCondition();
    /** 构造方法参数,在障碍被释放之前必须调用等待的线程数 */
    private final int parties;
    /* 越过栅栏时运行的命令 */
    private final Runnable barrierCommand;
    /** 当前的一代,控制CyclicBarrier的循环 */
    private Generation generation = new Generation();
    /** 记录仍在等待的参与方线程数量,初始值等于parties */
    private int count;
    

    主要内部类

    /** 代:屏障的每次使用都表示为一个生成实例 */
    private static class Generation {
    	  boolean broken = false; // 标识当前的栅栏已破坏或唤醒,jinglingwang.cn
    }
    

    构造方法

    一共有两个构造方法,第一个构造方法仅需要传入一个int值,表示调用等待的线程数;第二个构造方法多了一个runnable接口,当所有的线程越过栅栏时执行的命令,没有则为null;

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction; // Runnable 命令线程
    }
    

    await() 方法

    每个需要在栅栏处等待的线程都需要显式地调用这个方法。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            // 调用await方法,0:不超时 
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    

    dowait() 方法

    主要的障碍代码

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 当前锁
        final ReentrantLock lock = this.lock;
        // 加锁 
        lock.lock();
        try {
            // 当前代
            final Generation g = generation;
            // 检查当前代的状态,是否要抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 当前线程被中断了 
            if (Thread.interrupted()) {
                // 屏障被打破
                breakBarrier();
                throw new InterruptedException();
            }
            // count减一
            int index = --count;
            // index等于0,说明最后一个线程到达了屏障处
            if (index == 0) {  // tripped
                boolean ranAction = false; // 标识Runnable 命令线程是否有执行
                try {
                    final Runnable command = barrierCommand; // 第二个构造方法的入参,需要运行的命令线程
                    if (command != null)
                        command.run(); // 执行命令线程。by:jinglingwang.cn
                    ranAction = true;
                    nextGeneration(); // 更新重置整个屏障
                    return 0;
                } finally {
                    if (!ranAction) 
                        // ranAction 没有被设置成true;被中断了
                        breakBarrier();
                }
            }
    
            // 循环直到跳闸,断开,中断或超时
            for (;;) {
                try {
                    if (!timed) // 没有设超时时间,直接调用条件锁的await方法阻塞等待
                        trip.await();
                    else if (nanos > 0L) // 有超时时间
                        nanos = trip.awaitNanos(nanos); //调用条件锁的await方法阻塞等待一段时间
                } catch (InterruptedException ie) { // 捕获中断异常
                    if (g == generation && ! g.broken) {
                        breakBarrier(); //被中断,当前代会被标识成已被破坏
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                // 如果上面代码没有异常,理论上只有被唤醒后才会执行到下面的代码
                // 再次检查当前代是否已经被破坏
                if (g.broken)
                    throw new BrokenBarrierException();
                // 正常来说,最后一个线程在执行上面的代码时,会调用nextGeneration,重新生成generation
                // 所以线程被唤醒后,这里条件会成立
                if (g != generation)
                    return index;
    
                // 超时检查
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException(); //抛出超时异常
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    /** 重置屏障,回到初始状态,说明可以重复使用*/
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;  // 重置等的参与方线程数量计数,回到最初的状态
        generation = new Generation();
    }
    private void breakBarrier() {
        // 标识当前的栅栏状态
        generation.broken = true; 
        count = parties;
        // 条件锁,唤醒所有等待的线程,jinglingwang.cn
        trip.signalAll();
    }
    

    dowait() 方法过程总结:

    1. 参与方的多个线程执行逻辑代码后,分别调用await方法
    2. 线程分别拿到当前锁,最先获得锁的N-1个线程,调用条件锁Conditionawait方法,根据前面条件锁的源码分析我们知道,调用条件锁的await方法会释放当前锁,然后再调用Unsafa类底层 park 阻塞线程。
    3. 当最后一个线程调用await方法时(也就是上面的 if (index == 0) 分支逻辑,count减为0,屏障打破),会执行命令线程(构造方法的第二个入参Runnable),然后调用nextGeneration方法,唤醒所有的条件锁等待的N-1个线程(唤醒并不一定马上执行),然后重置计数与当前代,也就是一个新的屏障了,这也就是为什么可以重复使用的原因。
    4. 最后一个线程释放锁,N-1线程中的线程陆续获得锁,释放锁,完成整个流程

    CyclicBarrier 总结

    1. 支持两个构造参数:线程数和需要执行的命令线程
    2. CyclicBarrier 是基于ReentrantLock和Condition来实现屏障逻辑的
    3. 先抢到锁的N-1个线程会调用条件锁的await方法从而被阻塞
    4. 最后一个获得锁的线程来唤醒之前的N-1个线程以及来调用命令线程的run方法
    5. 最后一个获得锁的线程会生成一个新的屏障(new Generation()),也就是可以重复使用的屏障
    6. 如果线程中有一个线程被中断,整个屏障被破坏后,所有线程都可能抛出BrokenBarrierException异常
    7. 原文首发地址:https://jinglingwang.cn/archives/cyclicbarrier

    CyclicBarrier 与CountDownLatch的区别

    1. CyclicBarrier 是基于重入锁和条件锁来实现的
    2. CountDownLatch 是基于AQS的同步功能来实现的
    3. CyclicBarrier 不允许0个线程,会抛出异常
    4. CountDownLatch 允许0个线程,虽然没什么*用
    5. CyclicBarrier 阻塞的是N-1个线程,需要每个线程调用await,之后由最后一个线程来唤醒所有的等待线程,这也就是屏障的意思
    6. CountDownLatch 是计数为N,阻塞的不一定是N个线程(可以是一个或多个),由线程显示调用countDown方法来减计数,计数为0时,唤醒阻塞的一个线程或多个线程
    7. CyclicBarrier 最后一个线程会重置屏障的参数,生成一个新的Generation,可以重复使用,不需要重新new CyclicBarrier
    8. CountDownLatch 没有重置计数的地方,计数为0后不可以重复使用,需要重新new CountDownLatch 才可以再次使用
    bk