跳转至

25 齐头并进,胜利会师:Barrier屏障

你好,我是鸟窝。

在上节课,我们已经了解了在生产者消费者场景中常用的同步原语——条件变量。这节课我们继续学习Barrier,我们这里把它翻译成屏障。

在并发编程中,多个线程或进程协同工作以完成一项任务是很常见的。然而,在某些情况下,我们需要确保所有参与者都到达某个特定的“汇合点”后,才能继续执行。这时,Barrier(屏障)就派上了用场。

如果你是个体育爱好者,当你看田径比赛中百米赛跑或者跨栏比赛时,你会看到比赛前所有的运动员都会站在起跑线上,这就相当于一个屏障。等所有的运动员就位后,发令枪响起,运动员就可以跑起来了。

Barrier就是这样一种同步原语,它允许一组线程或进程相互等待,直到所有成员都到达某个预定的点。

  • 到达屏障的线程或进程会被阻塞,直到所有其他成员也到达。
  • 一旦所有成员都到达,它们才能同时继续执行。

Barrier内部维护一个计数器,初始化为参与者的数量。每个线程或进程到达屏障时,计数器减一。当计数器变为零时,表示所有参与者都已到达,屏障释放所有线程或进程。

Rust标准库中 Barrier 是可重用的。也就是等所有的参与者都已经达到后, Barrier 又可以用来下一轮的等待了。

Barrier介绍

标准库中的Barrier的使用非常简单, 因为主要就new和wait两个函数。

new 创建一个Barrier

就像函数名一样,new函数创建一个Barrier实例。

Barrier 会阻塞调用 wait() 的 n-1 个线程,并在第 n 个线程调用 wait() 时,同时唤醒所有线程。

创建 Barrier 必须指定参与者(线程)的数量。

下面的例子创建了包含三个参与者的实例。

use std::sync::Barrier;

let barrier = Barrier::new(3);

wait 等待其他线程到达同步点

调用此函数阻塞当前线程,直至所有线程在此处(同步点)完成同步汇合。

当此函数返回时,一个(随机选择的)线程将接收一个 BarrierWaitResult 实例,该实例的 is_leader() 方法返回 true,而所有其他线程将接收 is_leader() 方法返回 false 的结果。

下面是一个完整的例子。三个参与者同时到达同步点后,其中一个被选做leader:

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];
    for i in 0..3 {
        let b = barrier.clone();
        handles.push(thread::spawn(move || {
            println!("Thread {} is waiting at the barrier", i);
            let result = b.wait();
            println!("Thread {} has passed the barrier. is leader: {}", i, result.is_leader());
        }
        ));
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("All threads have passed the barrier");
}

重用

Barrier 这个数据结构是可以重用的,这意味着我可以使用 Barrier 执行多轮的同步。下面这个例子演示了三个线程执行两轮的同步:

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;

fn main() {
    let thread_count = 3;
    let barrier = Arc::new(Barrier::new(thread_count));

    let mut handles = vec![];

    for i in 0..thread_count {
        let barrier_clone = Arc::clone(&barrier);

        let handle = thread::spawn(move || {
            for round in 0..2 { // 每个线程执行两轮
                println!("线程 {},第 {} 轮,等待...", i, round);

                // 模拟线程执行一些任务
                thread::sleep(Duration::from_millis((i as u64 + 1) * 500));

                barrier_clone.wait(); // 等待所有线程到达屏障

                println!("线程 {},第 {} 轮,继续执行...", i, round);

                // 模拟线程执行一些任务
                thread::sleep(Duration::from_millis((i as u64 + 1) * 500));
            }
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("所有线程完成!");
}

虽然Rust没有正式定义自己的内存规范,但是 Barrier 这个数据结构隐含着当前的 wait 的执行一定 happen before 它的前一次的 wait 成功执行,否则就不能被重用了。

Barrier vs Condvar

虽然这两者都有阻塞和等待的功能,但是这两个同步原语还是有着明显的不同我梳理了一张表格,方便你直观做对比。

一个并发计算的例子

假设我们需要计算一个大型矩阵的乘法。由于矩阵非常大,单线程计算会非常耗时。为了提高计算效率,我们希望使用多线程并行计算。然而,矩阵乘法的计算过程可以分为多个阶段,每个阶段都需要所有线程完成才能进入下一个阶段。这时,Barrier 就非常有用。

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use rand::Rng;

// 模拟矩阵数据结构
struct Matrix {
    rows: usize,
    cols: usize,
    data: Vec<Vec<i32>>,
}

fn main() {
    let matrix_size = 100;
    let thread_count = 4;

    // 创建一个随机矩阵
    let matrix1 = Arc::new(Mutex::new(Matrix {
        rows: matrix_size,
        cols: matrix_size,
        data: generate_matrix(matrix_size, matrix_size),
    }));

    // 创建另一个随机矩阵
    let matrix2 = Arc::new(Mutex::new(Matrix {
        rows: matrix_size,
        cols: matrix_size,
        data: generate_matrix(matrix_size, matrix_size),
    }));

    // 创建一个结果矩阵
    let result_matrix = Arc::new(Mutex::new(Matrix {
        rows: matrix_size,
        cols: matrix_size,
        data: vec![vec![0; matrix_size]; matrix_size],
    }));

    let barrier = Arc::new(Barrier::new(thread_count));

    let mut handles = vec![];

    // 创建线程,每个线程计算矩阵乘法的一部分
    for i in 0..thread_count {
        let matrix1_clone = Arc::clone(&matrix1);
        let matrix2_clone = Arc::clone(&matrix2);
        let result_matrix_clone = Arc::clone(&result_matrix);
        let barrier_clone = Arc::clone(&barrier);

        let handle = thread::spawn(move || {
            // 将矩阵分块处理。每个线程处理一部分矩阵
            for row in (i*matrix_size/thread_count)..((i+1)*matrix_size/thread_count) {
                for col in 0..matrix_size {
                    let matrix1_lock = matrix1_clone.lock().unwrap();
                    let matrix2_lock = matrix2_clone.lock().unwrap();
                    let mut result_matrix_lock = result_matrix_clone.lock().unwrap();

                    for k in 0..matrix_size {
                        result_matrix_lock.data[row][col] += matrix1_lock.data[row][k] * matrix2_lock.data[k][col];
                    }
                }
            }

            println!("线程 {},完成部分计算,等待其他线程...", i);
            barrier_clone.wait(); // 等待所有线程到达屏障
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("矩阵乘法计算完成!");
}

// 辅助函数:生成随机矩阵
fn generate_matrix(rows: usize, cols: usize) -> Vec<Vec<i32>> {
    let mut rng = rand::thread_rng();
    (0..rows)
        .map(|_| (0..cols).map(|_| rng.gen_range(1..10)).collect())
        .collect()
}

这个例子中使用四个线程,每个线程负责计算矩阵乘法的一部分。使用一个Barrier同步四个线程,等四个线程都完成后矩阵相乘才完成。

总结

好了,在这一节课中,我们了解了Barrier(屏障)这个同步原语,Barrier用于确保多个线程或进程在继续执行之前都到达某个特定的同步点。它通过维护一个计数器实现,当所有参与者都到达时,计数器变为零,屏障释放所有线程。

Rust标准库中的Barrier是可重用的,主要通过new函数创建,并使用wait函数等待其他线程到达同步点。wait函数会阻塞当前线程,直到所有线程完成同步汇合。一个重要的特点是,Barrier可以进行多轮的同步等待。

思考题

并发经典的H2O工厂问题描述如下,请使用Barrier实现。

问题描述

假设你是一个水分子工厂的工程师。你的工厂接受氢原子(H)和氧原子(O)作为输入,并生产水分子(H₂O)。为了生产水分子,你需要:

  • 两个氢原子和一个氧原子。
  • 一旦你有了这两个氢原子和一个氧原子,它们必须同时结合形成一个水分子。
  • 如果其中一个原子准备好了,它必须等待直到所有原子都准备好。
  • 你不能让氢原子或氧原子等待太久。

目标

编写一个程序,模拟H₂O工厂,确保满足后面的要求:

  • 线程安全:多个氢原子和氧原子线程可以并发运行。
  • 正确性:只有当两个氢原子和一个氧原子都准备好时,才能形成水分子。
  • 效率:避免不必要的等待。

欢迎你在留言区记录你的思考或疑问。如果今天的内容对你有所帮助,也期待你转发给你的同事或者朋友,大家一起学习,共同进步。我们下节课再见!