CyclicBarrier底层实现和原理

CyclicBarrier底层实现和原理
强烈推介IDEA2021.1.3破解激活,IntelliJ IDEA 注册码,2021.1.3IDEA 激活码 

大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说CyclicBarrier底层实现和原理,希望能够帮助大家进步!!!

官网解释:

  • 允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

意思就是每个线程都得执行到等待点进行等待,直到所有线程都执行到等待点,才会继续往下执行。相当于日常开会,只有等每个参会的人都到之后才会开始会议。

 

CyclicBarrier底层实现和原理

 

用法:(以开会举例)

1、会议需要三个人
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() { @Override public void run() 2、这是三个人都到齐之后会执行的代码 System.out.println("三个人都已到达会议室") } });
3、定义三个线程,相当于三个参会的人 for (int i = 0; i < 3; i++) { final int finalI = i; new Thread(new Runnable() { @Override public void run() { try { 4、模拟每人到会议室所需时间 Thread.sleep((long) (Math.random()*5000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第"+Thread.currentThread().getName()+"个人到达会议室"); try { 5、等待其他人到会议室 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开始开会"); } }, String.valueOf(finalI)).start(); }

上述代码运行的结果为:

 

CyclicBarrier底层实现和原理

 

源码解析:

一、构造方法

 

CyclicBarrier底层实现和原理

 

有两个构造方法,只有带Runnable参数的构造方法才会在所有线程都到达等待点之后执行Runnable里面的run方法。

CyclicBarrier(int parties) { this(parties, null);
}
CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction;
}

二、维护锁状态逻辑

其底层使用ReentrantLock+Condition进行锁状态的维护

1、维护锁状态
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
2、线程组数
private final int parties;
3、所有线程到达等待点后执行的Runnable
private final Runnable barrierCommand;
4、需要等待的线程数量
private int count;
6、屏障点定义
private static class Generation { boolean broken = false;
}

具体看看其是如何实现等待逻辑的,线程等待需要调用await方法

public int await() { return dowait(false, 0L); }
public int await(long timeout, TimeUnit unit){ return dowait(true, unit.toNanos(timeout));
}

最终调用的是dowait方法

private int dowait(boolean timed, long nanos){ final ReentrantLock lock = this.lock; 1、获取锁 lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); 2、如果线程中断,重置等待线程数量并且唤醒当前等待的线程 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } 3、等待线程数减1 int index = --count; 4、当等待线程数为 时 if (index == 0) { // tripped boolean ranAction = false; try { 5、执行所有线程都到达等待点之后的Runnable final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; 6、唤醒所有线程并生成下一代 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } 7、如果等待线程数不为0 for (;;) { try { 8、根据传入的参数来决定是定时等待还是非定时等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { 9、线程中断之后唤醒所有线程并进入下一代 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } 10、如果线程因为打翻屏障操作而被唤醒则抛出异常 if (g.broken) throw new BrokenBarrierException(); 11、如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) return index; 12、如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

可以看到,是通过index字段控制线程等待的,当index不为0的时候,线程统一会进行阻塞,直到index为0的时候,才会唤醒所有线程,这时候所有线程才会继续往下执行。

三、重复使用

这个跟CountdownLatch不一样的是,CountdownLatch是一次性的,而CycliBarrier是可以重复使用的,只需调用一下reset方法。

public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { 1、破坏当前的屏障点并唤醒所有线程 breakBarrier(); 2、生成下一代 nextGeneration(); } finally { lock.unlock(); }
}
private void breakBarrier() { generation.broken = true; 将等待线程数量重置 count = parties; 唤醒所有线程 trip.signalAll();
}
private void nextGeneration() { 唤醒所有线程 trip.signalAll(); 将等待线程数量重置 count = parties; generation = new Generation();
}

上述就是对CycliBarrier的解析。

本文来源huayang183,由架构君转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/17945

发表评论