CyclicBarrier は文字通りループ バリア (循環バリア) を意味し、スレッドのグループを特定の状態 (バリア ポイント) で待機させ、すべてを同時に実行できます。これは、待機中のスレッドがすべて解放された後に CyclicBarrier を再利用できるため、ループバックと呼ばれます。
CyclicBarrier の機能は、スレッドのグループを相互に待機させることです。共通点に達すると、それまで待機していたすべてのスレッドが実行を継続し、CyclicBarrier関数は再利用できます。
構築方法:
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。 public CyclicBarrier(int parties) // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
重要な方法:
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞 // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循环 通过reset()方法可以进行重置
CyclicBarrier を使用すると、複数のスレッドでデータを計算し、最終的に計算結果をマージすることができます。
public class CyclicBarrierTest2 { //保存每个学生的平均成绩 private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成绩为:"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //获取学生平均成绩 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同学的平均成绩为:"+score); try { //执行完运行await(),等待所有学生平均成绩都计算完毕 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); } }
public class CyclicBarrierTest3 { public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } }
プレーヤー No. 1配置され、共有する準備ができています: 395ms1## プレーヤー No. 5 が配置され、共有する準備ができています: 733ms2 プレーヤー No. 2 が配置され、共有する準備ができています: 776ms プレーヤー No. 3 が配置され、共有する準備ができています共有: 807ms4 主審: ゲームが開始されました ~~
4 人のプレーヤーが配置され、共有する準備ができました: 131ms03 人のプレーヤーが配置され、共有する準備ができました: 256ms12 人のプレーヤーが配置されます、共有準備完了: 291ms2
プレイヤー No. 1 が配置され、共有準備完了: 588ms3
プレイヤー No. 5 が配置され、共有準備完了: 763ms4
主審: ゲーム開始~~
3. CyclicBarrier のソース コード分析
CyclicBarrier プロセス
#メイン プロセスは次のとおりです:
ブロッキングに入る前に、まず条件キューに入り、次にロックを解放し、最後にブロックする必要があります;
カウント != 0 の場合、ウェイクアップが実行されます。条件キュー内のすべてのノードがブロッキング キューに変換されます。
以下は簡単なフローチャートです:
#よくある質問がいくつかありますか?
1. スレッドのグループは、バリアをトリガーする前に互いに待機します。最後のスレッドがバリアに到達した後、ウェイクアップ ロジックはどのように実装されますか。ウェイクアップ プロセスは次のとおりです。呼び出し
java.util.concurrent.locks.Condition#signalAll条件キュー上のすべてのノードを起動します。
2. 列削除サイクルはどのように実装されますか? 実際には、ミューテックス ReentrantLock の条件キューとブロッキング キューが変換されます。3. 条件キューから同期キューへの変換の実装ロジック? 変換プロセス中、まず条件キュー内のブロックされているすべてのスレッドが起動され、次にロックが取得されます。取得に失敗した場合は、同期キューに入ります。
以上がJavaでCyclicBarrierサイクルバリアを適用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。