跳转至

15 东邪西毒,南帝北丐:其他异步运行时

你好,我是鸟窝。

上节课我们重点介绍了Tokio这个异步运行时。虽然它是使用最广泛的一个异步运行时,但是也并不是它一家独大,还有几个优秀的运行时,这节课我一一给你道来。

async-std

async-std 和它的一系列配套库,是为了让你的异步编程更轻松而生的。它为各种库和应用都提供了最基础的工具。它的名字也说明了它的设计思路:尽可能地照着 Rust 标准库来做,把里面的东西都换成异步的版本。这也是它的特色,和其他异步运行时不太一样的地方。

async-std 提供了各种重要功能的接口:文件操作、网络操作,还有像定时器这样的并发基础功能。它也提供了一个叫做 task 的东西,用起来有点像 Rust 标准库里的 thread 模块。而且它不光有 I/O 相关的,还有像 Mutex 这种也能在 async/await 环境下用的版本。

async-std 开发者的设计理念就是异步 Rust 应该像同步 Rust 一样简单易学,最好的 API 就是你已经用惯的那些。他们认为给标准库配上异步的版本,是让大家既能保证性能又能提高效率的最佳方案,因为标准库本身就很可靠。

async-std 就是为了实现这个目标而生的。它把单次内存分配的任务创建方式,跟一个能自动调节的无锁执行器、线程池和网络驱动结合起来,构成了一个很顺畅的系统,用大家熟悉的 Rust 标准库 API,就能高速、低延迟地处理各种任务。

图片

下面是一个极简单的 async-std 例子:

use async_std::task;

async fn say_hello() {
    println!("Hello, world!");
}

fn main() {
    task::block_on(say_hello())
}

async-std 入门

async-std 也提供了Tokio类似的属性,从main函数中就可以调用异步函数:

async fn say_hello() {
    println!("Hello, world!");
}

#[async_std::main]
async fn main() {
    say_hello().await;
}

包括测试:

#[tokio::test]
async fn my_test() -> std::io::Result<()> {
    assert_eq!(2 * 2, 4);
    Ok(())
}

有几个环境变量可以用来配置 async-std 运行时:

  • ASYNC_STD_THREAD_COUNT:控制 async-std 运行时启动多少个线程。默认情况下,每个逻辑 CPU 会启动一个线程,这个数量由 async-global-executor 决定,可能跟实际的物理 CPU 数量不一样。如果这个变量设置的不是正整数,async-std 就会 panic。
  • ASYNC_STD_THREAD_NAME:设置 async-std 运行时线程在操作系统里显示的名字。默认值是 "async-std/runtime"

也可以使用 block_on 启动一个任务,然后让当前线程等着,直到任务跑完拿到结果。用 block_on 这个函数就跟启动一个线程然后立刻等它结束差不多,只不过启动的是异步的任务。

use async_std::task;

fn main() {
    task::block_on(async {
        println!("Hello, world!");
    })
}

Task

一个正在执行的异步 Rust 程序由一组原生操作系统线程组成,在它之上多路复用多个无栈协程。我们称这些为“任务”(Task)。任务可以被命名,并提供一些内置的同步支持。

任务之间的通信可以通过通道(Rust 的消息传递类型)以及其他形式的任务同步和共享内存数据结构来完成。特别是保证线程安全的类型可以使用原子引用计数容器 Arc 轻松地在任务之间共享。

Rust 中致命的逻辑错误会导致线程 panic,在此期间,线程将展开堆栈,运行析构函数并释放拥有的资源。如果 panic 发生在任务内部,就没有有效的方法进行恢复,因此 panic 将通过任何线程边界一直传播到根任务。这也叫做 panic = abort 模型

async-std 提供了 async_std::task 模块实现任务相关的功能。

生成任务

我们可以使用 task::spawn 函数生成一个新任务:

use async_std::task;

task::spawn(async {
    // 这里是一些工作
});

在这个例子中,生成的任务与当前任务“分离”。这意味着它可以比它的父任务(生成它的任务)活得更久,除非此父任务是根任务。

根任务也可以等待子任务完成;调用 spawn 会产生一个 JoinHandle,它实现了 Future 并且可以被 await

use async_std::task;

let child = task::spawn(async {
    // 这里是一些工作
});

// 这里是一些工作

let res = child.await;

await 运算符返回子任务产生的最终值。

JoinHandletask 方法返回关联的Task。

JoinHandlecancel 方法取消关联的Task。

有三种spawn函数:

  • spawn:此函数类似于 std::thread::spawn,但用于生成异步任务。
  • spawn_blocking:生成一个阻塞任务。该任务将在一个专门的阻塞任务线程池中执行,以防止长时间运行的同步操作阻塞主 futures 执行器。
  • spawn_local:在线程局部执行器上生成任务。

下面的例子演示了这三种方法的用法:

use async_std::task;

#[async_std::main]
async fn main() {
    let v = task::spawn(async {
        1 + 2
    }).await;
    assert_eq!(v, 3);

    task::spawn_blocking(|| {
        println!("long-running task here");
    })
    .await;

    let v = task::spawn_local(async {
        1 + 2
    }).await;
    assert_eq!(v, 3);
}

配置任务

我们可以在生成新任务之前通过 Builder 类型对其进行配置,这个类型目前允许你设置子任务的名称:

use async_std::task;

task::Builder::new().name("child1".to_string()).spawn(async {
    println!("Hello, world!");
});

Builder 类型三个执行任务的方法:

  • blocking:生成一个配置好的任务,并同步等待其完成。
  • local:使用配置的设置在本地生成一个任务。
  • spawn:使用配置的设置生成一个异步任务。

Task类型

任务通过 Task 类型表示,你可以通过以下两种方式之一获取它:

  • 通过生成一个新任务,例如使用 task::spawn 函数,并在 JoinHandle 上调用 task
  • 通过使用 task::current 函数请求当前任务,或者通过 task::try_current 函数请求当前的任务。task::try_current 函数只有在block_onspawn 和 Builder::spawn 的上下文中才会返回任务,否则返回 None
use async_std::task;

#[async_std::main]
async fn main() {
    task::spawn(async {
        println!("current task: {}", task::current().id());
    }).await;

    task::spawn_blocking(|| {
        let _ = task::Builder::new().name("child1".to_string()).spawn(async {
            println!("current task: {:?}", task::try_current());
        });
    })
    .await;
}

任务本地存储

async::task 还为 Rust 程序提供了一个任务本地存储的实现。任务本地存储是一种将数据存储到全局变量的方法,程序中的每个任务都将拥有该变量的副本。任务不共享此数据,因此访问不需要同步。

LocalKey(任务本地键)拥有它包含的值,并在任务退出时销毁该值。它使用 task_local! 宏创建,并且可以包含任何 'static(无借用指针)的值。它提供了一个访问器函数 with,该函数将对该值的共享引用传递给指定的闭包。LocalKey只允许对值进行共享访问,因为如果允许可变借用,则无法保证唯一性。

LocalKey访问任务本地值的键。每个任务本地值是按需惰性初始化的,并在任务结束后销毁。

按需惰性初始化(Lazy Initialization on Demand)指的是资源或对象的初始化被延迟到真正需要使用它们的时候才进行。

下面是一个LocalKey的例子。如果你已经学习了前一课,对这个LocalKey不会感到陌生。事实上Rust各个运行时又有很多概念都是相通的。

use std::cell::Cell;

use async_std::task;
use async_std::prelude::*;

task_local! {
    static VAL: Cell<u32> = Cell::new(5);
}

task::block_on(async {
    let v = VAL.try_with(|c| c.get());
    assert_eq!(v, Ok(5));
});

// 返回error, 因为不是在一个任务中调用的
assert!(VAL.try_with(|c| c.get()).is_err());

命名任务

任务可以是具有关联的名称,用来识别目的。默认情况下,生成的任务是未命名的。要为任务指定名称,需要使用 Builder 构建任务,并将所需的任务名称传递给 Builder::name。使用 Task::name 可以从任务内部检索任务名称。

yield_now

主动让出时间片给任务调度器。调用此函数会将当前 future 移到执行队列末尾,让其他 future 有机会执行。这在 future 中执行 CPU 密集型操作后很有用。

use async_std::task;

#[async_std::main]
async fn main() {
    for i in 0..3 {
        task::spawn(async move{
            task::yield_now().await;
            println!("yielded in task {}", i);
        }).await;
    }


    task::yield_now().await;  
}

你可以把它想象成在排队买东西时,你主动跟后面的人说:“你先来吧,我等等没关系。”

sleep

sleep 使当前异步任务暂停执行指定的时长,这个函数保证至少休眠指定的时长,但实际休眠时间可能略长。这个函数相当于 std::thread::sleep 的异步版本,不会阻塞当前线程。

use std::time::Duration;
use async_std::task;

#[async_std::main]
async fn main() {
    task::sleep(Duration::from_secs(1)).await;

    // std::thread::sleep(Duration::from_secs(1));
}

注意在异步代码中不要使用 std::thread::sleep,否则会阻塞调用的线程导致其他任务没有办法被执行,要使用此异步的版本。

future

async_std 模块提供Future的扩展功能。

通常,我们希望像操作单个 future 一样等待多个 future。join 操作族将多个 future 转换为一个返回所有 future 输出的单个 future。race 操作族将多个 future 转换为一个返回第一个 future 输出的单个 future。

以下函数可用于操作 future:

图片

对于返回 Result 的 future,可以使用上述函数的附加 try_ 变体。这些函数可以识别 Result,并且其行为与基本变体略有不同。对于 try_join,如果任何一个 future 返回 Err,则所有 future 都将被丢弃并返回一个错误。这被称为“短路”。

对于 try_race,它不会返回第一个完成的 future,而是返回第一个成功完成的 future。这意味着 try_race 将继续执行,直到其中一个 future 返回 Ok,或者所有 future 都返回 Err

然而,有时即使对于返回 Result 的 future,使用函数的基本变体也可能很有用。以下是适用于 Result 的操作及其各自语义的概述:

图片

  • pending 函数返回一个永不会完成的future。
  • ready 函数返回一个指定值的future。
  • timeout 函数等待一个future完成,带超时功能,超过一定的时间就不再等待了。

把这个三个函数放在一个例子中,如下:

use async_std::future;
use std::time::Duration;

#[async_std::main]
async fn main() {
    let never = future::pending::<()>();
    let dur = Duration::from_millis(5);
    assert!(future::timeout(dur, never).await.is_err());

    let val = future::ready(5);
    let dur = Duration::from_millis(5);
    assert!(future::timeout(dur, val).await.is_ok());
}

io

async_std::io 模块提供核心输入输出功能的 trait、辅助函数和类型定义。它是用于异步编程的 std::io 模块的对应版本。进行输入输出操作时,该模块会用到一些常用的类型。

Read 和 Write trait

ReadWrite 是两个最为核心的 trait,用于提供最通用的读写操作接口。由于它们是 trait,因此很多其他类型都可以实现它们,你也可以为自己的类型实现这两个 trait。因此你会看到几种不同的 I/O 类型(types):文件(Files)、TcpStreams(TCP 流)、有时甚至是 Vec<T>。例如,Read trait 为 File 类型添加了 read 方法,可以用来读取文件内容:

use async_std::fs::File;
use async_std::prelude::*;

let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// 读取最多 10 个字节
let n = f.read(&mut buffer).await?;
println!("读取到的字节: {:?}", &buffer[..n]);

ReadWrite 非常重要,这两个特性(traits)的实现者(implementors)有一个昵称:读取器(reader)和写入器(writer)。因此,你有时会看到“一个读取器”而不是“一个实现了 Read 特性的类型”。这样更简洁!

Seek 和 BufRead

除此之外,async_std::io 模块还提供了两个重要的特性:SeekBufRead。它们都基于读取器工作,以控制读取的执行方式。Seek 允许你控制下一个字节的来源:

use async_std::fs::File;
use async_std::io::SeekFrom;
use async_std::prelude::*;

let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// 跳到文件的最后 10 个字节
f.seek(SeekFrom::End(-10)).await?;
// 读取最多 10 个字节
let n = f.read(&mut buffer).await?;
println!("读取到的字节: {:?}", &buffer[..n]);

BufRead 使用一个内部缓冲区(buffer)来提供许多其他读取方式,但要展示它,我们需要先谈论一下缓冲区的一般概念。

BufReader 和 BufWriter

基于字节的接口使用起来比较笨拙且效率可能低下,因为我们需要不断地调用操作系统。为了解决这个问题,std::io 提供了两个结构体:BufReaderBufWriter,它们包装了读取器和写入器。包装器使用缓冲区,减少了调用次数,并提供了更友好的方法来精确访问你想要的内容。

例如,BufReaderBufRead 特性协同工作,为任何读取器添加额外的方法。

use async_std::fs::File;
use async_std::io::BufReader;
use async_std::prelude::*;

let f = File::open("foo.txt").await?;
let mut reader = BufReader::new(f);
let mut buffer = String::new();

// 读取一行到 buffer 中
reader.read_line(&mut buffer).await?;
println!("{}", buffer);

BufWriter 不会添加任何新的写入方式,它只是使用缓冲对 write 的每次调用进行优化:

use async_std::fs::File;
use async_std::io::prelude::*;
use async_std::io::BufWriter;

let f = File::create("foo.txt").await?;

{
    let mut writer = BufWriter::new(f);

    // 向缓冲区写入一个字节
    writer.write(&[42]).await?;
} // 当 writer 超出作用域时,缓冲区会被刷新

标准输入和输出

一个非常常见的输入来源是标准输入:

use async_std::io;

let mut input = String::new();

io::stdin().read_line(&mut input).await?;
println!("你输入了:{}", input.trim());

注意,你不能在不返回 Result<T, E> 的函数中使用 ? 运算符。返回 Result<T, E> 时,你可以调用 .unwrap() 或使用 match 匹配返回值以捕获任何可能的错误:

use async_std::io;

let mut input = String::new();

io::stdin().read_line(&mut input).await.unwrap();

一个非常常见的输出来源是标准输出:

use async_std::io;
use async_std::io::prelude::*;

io::stdout().write(&[42]).await?;

当然,直接使用 io::stdout 比使用类似 println! 的宏要少见。

迭代器类型

std::io 提供的许多结构用于以各种方式迭代 I/O。例如,Lines 用于按行分割:

use async_std::fs::File;
use async_std::io::BufReader;
use async_std::prelude::*;

let f = File::open("foo.txt").await?;
let reader = BufReader::new(f);
let mut lines = reader.lines();
while let Some(line) = lines.next().await {
    println!("{}", line?);
}

async_std 提供了异步的访问方法,比如上面的 lines.next().await

函数

还有一些函数可以访问各种功能。例如,我们可以使用 io::copy 将标准输入的内容全部复制到标准输出:

use async_std::io;

io::copy(&mut io::stdin(), &mut io::stdout()).await?;

其他有用的函数:

图片

io::Result 类型

最后但同样重要的,是 io::Result 类型。它是许多 std::io 函数的返回值类型,这些函数可能会引发错误。你也可以将此类型作为自己函数的返回值。本模块中的许多示例都使用了 ? 操作符:

#![allow(dead_code)]
use async_std::io;

async fn read_input() -> io::Result<()> {
    let mut input = String::new();

    io::stdin().read_line(&mut input).await?;

    println!("你输入的内容: {}", input.trim());

    Ok(())
}

read_input 函数的返回值类型 io::Result<()> 非常常见,适用于那些没有“真实”返回值的函数,但需要在发生错误时返回错误信息。在这个例子中,此函数的唯一目的是读取一行内容并打印它,因此我们使用 () 表示空值。

os

os模块定义了和特定操作系统相关的一些特性,包括Unix和Windows相关的。

这里我就不具体展开讲了,如果你现实项目中遇到和操作系统相关的特定的需求,你可以再来翻阅一下它的文档。

path

此模块提供了两种类型 PathBufPath(类似于 Stringstr),用于以抽象的方式处理路径。这些类型分别是 OsStringOsStr 的轻量级包装器,这意味着它们根据本地平台的路径语法直接操作字符串。

通过迭代 Path 上的 components 方法返回的结构,可以将路径解析为 Components(路径组成部分)。Components 大致对应于路径分隔符(/\)之间的子字符串。你可以使用 PathBuf 上的 push 方法从组成部分重建等效的路径。注意,路径在语法上可能因 components 方法的文档中描述的规范化而有所不同。

路径操作包括从切片解析组成部分以及构建新的拥有所有权的路径。要解析路径,你可以从 str 切片创建一个 Path 切片,然后进行查询。

use async_std::path::Path;
use std::ffi::OsStr;

let path = Path::new("/tmp/foo/bar.txt");

let parent = path.parent();
assert_eq!(parent, Some(Path::new("/tmp/foo")));

let file_stem = path.file_stem();
assert_eq!(file_stem, Some(OsStr::new("bar")));

let extension = path.extension();
assert_eq!(extension, Some(OsStr::new("txt")));

要构建或修改路径,请使用 PathBuf

use async_std::path::PathBuf;

// 这种方式可行……
let mut path = PathBuf::from("c:\\");

path.push("windows");
path.push("system32");

path.set_extension("dll");

// ……但如果你事先不知道所有内容,最好使用 push。如果知道,这种方式更好:
let path: PathBuf = ["c:\\", "windows", "system32.dll"].iter().collect();

MAIN_SEPARATOR 代表此操作系统中的路径分隔符,比如Unix/Linux中的"/", Windows中的“\”。函数is_separator判断一个字符是不是分隔符。

process 进程

process 模块负责进程相关的操作。它主要用来启动和控制子进程,同时也提供了 abortexit 两个方法,可以用来结束当前运行的程序。这是 std::process 的异步版本。

以下是这个模块提供的结构体:

下面是使用Command的例子,生成一个Command后获得它的输出:

use async_std::process::Command;

#[async_std::main]
async fn main() -> std::io::Result<()> {
    let output = Command::new("echo")
        .arg("hello")
        .output()
        .await?;

    println!("status: {}", output.status);
    println!("stdout: {}", String::from_utf8_lossy(&output.stdout));
    println!("stderr: {}", String::from_utf8_lossy(&output.stderr));

    Ok(())
}

使用多个参数的例子:

use async_std::process::Command;
use std::process::Stdio;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let output = Command::new("ls")
        .arg("-l")
        .arg("-a")
        .stdout(Stdio::piped())
        .output()
        .await?;

    println!("{}", String::from_utf8_lossy(&output.stdout));
    Ok(())
}

支持bash和管道符:

#[async_std::test]
async fn test_bash() -> Result<(), Box<dyn std::error::Error>> {
    let output = Command::new("bash") // 或 Command::new("sh")
        .arg("-c")
        .arg("ls -l | grep txt") // 这里是你的 shell 命令
        .stdout(Stdio::piped())
        .stderr(Stdio::piped()) // 捕获错误输出
        .output()
        .await?;

    if output.status.success() {
        println!("{}", String::from_utf8_lossy(&output.stdout));
    } else {
        eprintln!("{}", String::from_utf8_lossy(&output.stderr));
    }

    Ok(())
}

stream

这个模块是 std::iter 的异步版本。如果你发现自己需要处理某种异步集合,并需要对该集合的元素执行操作,你很快就会接触到“流(streams)”。流在符合 Rust 惯例的异步代码中被大量使用,因此熟悉它们是很有价值的。

在进一步解释之前,我们先来了解一下stream模块的结构:

  • 特性(Traits)是核心部分,定义了存在哪些类型的流以及你可以用它们做什么。这些特性的方法值得花一些额外的时间学习。
  • 函数(Functions)提供了一些创建基本流的有用方法。
  • 结构体(Structs)通常是本模块特性中各种方法的返回类型。你通常应该关注创建结构体的方法,而不是结构体本身。

本模块的核心是 Stream 特性(trait)。Stream 的核心定义如下:

#![allow(dead_code)]
pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

一个流有一个 next 方法,调用该方法会返回一个 Poll<Option<Item>>。只要还有元素,next 就会返回 Ready(Some(Item));一旦所有元素都被耗尽,它将返回 None 以表明迭代已完成。如果我们正在等待某个异步操作完成,则会返回 Pending。各个流可以选择恢复迭代,因此再次调用 next 可能最终会在某个时候再次开始返回 Ready(Some(Item)),也可能不会。

Stream 的完整定义还包括许多其他方法,但它们都是基于 next 构建的默认方法,因此你可以免费获得它们。

流也是可组合的,并且通常会将它们链接在一起以执行更复杂的处理形式。有三种常用的方法可以从集合创建流:

  • stream(),它迭代 &T
  • stream_mut(),它迭代 &mut T
  • into_stream(),它迭代 T

async-std 中的各种类型可能会在适当的情况下实现这三种方法中的一个或多个。

创建自定义流涉及两个步骤:创建一个用于保存流状态的结构体,然后为这个结构体实现Stream特性。这就是这个模块中存在如此多结构体的原因:每个流和迭代器适配器都有一个对应的结构体。

Rust 的 while let 循环语法是一种迭代流的惯用方式。下面是 while let 的一个基本示例:

let mut values = stream::from_iter(1u8..6);
while let Some(x) = values.next().await {
  println!("{}", x);
}

这将逐行打印数字 1 到 5。但是你会注意到这里有一点:我们从来没有调用向量上的任何方法来生成流。这是怎么回事?

标准库中有一个用于将某些东西转换为流的特性:IntoStream。此特性只有一个方法 into_stream,它将实现 IntoStream 的对象转换为流。

std::iter::IntoIterator 不同的是,IntoStream 目前还没有编译器的支持。这意味着像 for 循环那样自动转换还不存在,因此 into_stream 或如上所述的 from_iter 总是需要手动调用。

接受一个 Stream 并返回另一个 Stream 的函数通常被称为流适配器(stream adapters),因为它们是适配器模式 (adapter pattern)的一种形式。常见的流适配器包括 maptakefilter。更多信息请参阅它们的文档。

流和流适配器都是惰性的(lazy)。这意味着仅仅创建流本身并不会做太多事情。只有当你调用 next 方法时,流才会真正开始运作。当单独为了流的副作用(side effects)而创建流时,这有时会导致混淆。例如,map 方法会对它迭代到的每个元素调用一个闭包函数:

let v = stream::repeat(1u8).take(5);
v.map(|x| println!("{}", x))

这将不会打印任何值,因为我们只是创建了一个流,并没有真正使用它。编译器会对这种行为发出警告:

warning: unused result that must be used: streams are lazy and do nothing unless consumed

使用 while let 循环来发挥 map 的副作用才是惯用的做法:

let mut v = stream::repeat(1u8).take(5);
while let Some(x) = &v.next().await {
  println!("{}", x);
}

让流生效的两种最常见方式是使用像这样的 while let 循环,或者使用 collect 方法来生成一个新的集合。

既然我们更需要关注它的函数,那么在这里我列出了它的所有的函数,我们可以大概了解一下它们的功能:

net

async_std 还提供了网络相关的异步版本。本来Tikio的网络模块在前一课中没有介绍,这一课也不应该介绍 async_std 的网络相关的内容,但是考虑到后面没有专门的课独立 async_std 了,所以在这一节我们简单介绍下。

net模块用于 TCP/UDP 通信的网络原语。这一模块提供了传输控制协议 (TCP) 和用户数据报协议 (UDP) 的网络功能,以及 IP 和套接字地址的类型。它是 std::net 的异步版本。

下面是一个udp server的例子,实现了echo协议:

use async_std::net::UdpSocket;

#[async_std::main]
async fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("127.0.0.1:8080").await?;
    let mut buf = vec![0u8; 1024];

    loop {
        let (n, peer) = socket.recv_from(&mut buf).await?;
        socket.send_to(&buf[..n], &peer).await?;
    }
}

下面是 udp client 的例子:

use async_std::net::UdpSocket;

#[async_std::main]
async fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("0.0.0.0:0").await?;
    let server_addr = "127.0.0.1:8080";
    let msg = b"Hello, world!";

    socket.send_to(msg, server_addr).await?;

    let mut buf = vec![0u8; 1024];
    let (n, _) = socket.recv_from(&mut buf).await?;

    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));

    Ok(())
}

sync

这个模块是std::sync的异步版本。事实上我们还没有开始介绍标准库的同步原语,但是我们不妨先了解一下它的异步版本,不熟悉也没有关系,大概了解下,后面学习了之后再来回顾。

async-std 的同步原语是调度器感知的,这使得可以 await 它们的操作——例如 Mutex 的加锁操作。

从概念上讲,一个 Rust 程序是在计算机上执行的一系列操作。程序中发生的事件时间线与代码中操作的顺序一致。我们看一下下面操作全局静态变量的代码:

static mut A: u32 = 0;
static mut B: u32 = 0;
static mut C: u32 = 0;

fn main() {
    unsafe {
        A = 3;
        B = 4;
        A = A + B;
        C = B;
        println!("{} {} {}", A, B, C);
        C = A;
    }
}

这段代码看起来像是修改了一些存储在内存中的变量,执行了一个加法运算,结果存储在 A 中,并且变量 C 被修改了两次。当只涉及单个线程时,结果正如预期:输出 7 4 4

至于幕后发生的事情,当启用优化时,最终生成的机器代码可能与源代码大相径庭:

  • C 的第一次存储可能会被移到对 AB 的存储之前,就好像我们写了 C = 4; A = 3; B = 4 一样。
  • A + B 赋值给 A 的操作可能会被移除,因为可以在打印之前将总和存储在一个临时位置,而全局变量则永远不会被更新。
  • 最终结果只需在编译时查看代码即可确定,因此常量折叠(constant folding)可能会将整个代码块转换为简单的 println!("7 4 4")

编译器可以执行这些优化的任意组合,只要最终优化的代码在执行时产生与未优化代码相同的结果即可。

由于现代计算机中涉及并发(concurrency),因此关于程序执行顺序的假设通常是错误的。即使禁用编译器优化,访问全局变量也可能导致不确定的结果,并且仍然可能引入同步错误。

请注意,在 Rust 中,如果你想直接访问全局(静态)变量,而不使用任何专门用来同步多线程访问的工具(也就是“同步原语”),那么你就必须使用 unsafe 代码块。

乱序执行
由于各种原因,指令的执行顺序可能与我们定义的顺序不同:
1. 编译器(compiler)重新排序指令:如果编译器可以在更早的时间点发出指令,它会尝试这样做。例如,它可能会将内存加载(memory loads)提升到代码块的顶部,以便 CPU 可以开始从内存中预取(prefetching)值。在单线程场景中,这会在编写信号处理程序或某些类型的底层代码时导致问题。使用编译器屏障(compiler fences)可以防止这种重新排序。
2. 单个处理器(processor)乱序执行指令:现代 CPU 能够进行超标量(superscalar)执行,即使机器代码描述的是一个顺序过程,也可能有多个指令同时执行。这种重新排序由 CPU 透明地处理。
3. 多处理器(multiprocessor)系统同时执行多个硬件线程:在多线程场景中,你可以使用两种类型的原语来处理同步:
a. 内存屏障(memory fences)确保内存访问以正确的顺序对其他 CPU 可见。
b. 原子操作(atomic operations)确保同时访问同一内存位置不会导致未定义行为。

高级同步对象

大多数底层同步原语都相当容易出错且使用不便,因此 async-std 也公开了一些高级同步对象。这些抽象概念可以基于更底层的原语构建。为了提高效率,async-std 中的同步对象通常借助调度器来实现,调度器能够在任务阻塞于获取锁时重新调度这些任务。

以下是可用同步对象的概述:

如果你正在寻找通道,请查看 async_std::channel

一个使用Arc、Mutex的例子,在一个任务对受锁保护的变量赋值为1:

use async_std::sync::{Arc, Mutex};
use async_std::task;

let m1 = Arc::new(Mutex::new(0));
let m2 = m1.clone();

task::spawn(async move {
    *m2.lock().await = 1;
})
.await;

assert_eq!(*m1.lock().await, 1);

和Tokio兼容

async-std 和 Tokio 是两个独立的 Rust 异步运行时。它们各自拥有自己的调度器和 API,因此默认情况下是互不兼容的。然而,为了方便用户在某些场景下同时使用这两个运行时,async-std 提供了 tokio03 这个 Cargo 特性。

启用 tokio03 特性后,async-std 会提供一些适配层,使得部分 async-std 的类型和函数可以与 Tokio 0.3 的类型和函数进行互操作。这通常涉及类型转换或包装。例如,你可能需要在一个使用 Tokio 0.3 的项目中,使用 async-std 提供的一些特定功能。通过启用 tokio03 特性,你就可以在同一个项目中同时使用这两个运行时,而无需完全迁移代码。

还可以开启特性,选择兼容tokio1、tokio02。

好了,async_std 的内容讲完了,这个库的内容也是非常丰富,提供了很多标准库的异步版本,尤其涉及到IO操作(包括网络操作)、进程、流、同步原语等方向。

网上一篇关于async-std的状态的文章总结,以及开发者对async_std未来发展的质疑,以及内部的江湖,包括smol作者Stjepan, 你可以访问这篇文章:async-std 创建者对于最近“项目是否已死?”,移除对其支持等的答复

下一节我们介绍一个小而美的异步运行时 smol。

smol

这个 crate 只是简单地重新导出了一些更小的异步 crate。

如果你想在 smol 中使用基于 tokio 的库,需要使用 async-compat 适配器来转换 future 和 I/O 类型。

smol 的作者stjepang 是 Rust 异步生态系统中一位非常活跃且有影响力的人物。他不仅是 smol 的主要作者,还曾参与过 tokioasync-std 的开发,更是大名鼎鼎crossbeam的作者,也是一堆异步库的开发者。这使得他对 Rust 异步编程的各种方法和权衡有着深刻的理解。

从一些社区的讨论中可以看出,开发者们非常欣赏 stjepang 的思考方式和解决问题的方法。他倾向于简洁、高效的解决方案,这也在 smol 的设计中得到了充分体现。

smol 的出现,一定程度上是对当时 Rust 异步生态中一些问题的回应。在 smol 诞生之前,tokio 占据了主导地位,而 async-std 则试图提供一个更接近标准库的异步体验。然而,一些开发者觉得 tokio 有些过于庞大和复杂,而 async-std 虽然更易用,但在某些方面又不够灵活。

Rust 非常有名的人物,crossbeam、async-std 和 smol 的核心作者 Stjepan Glavina,在2021初由于不明原因删除了所有博客和他的 Rust 项目,好像默默地退出了 Rust 社区。所以他的很多有价值的文章现在都看不到了。也许被Rust伤害太深,不被部门同行认可,使他萌生退意,一个大神就此告别了Rust,损失巨大。哪儿都有江湖,包括Go生态圈。而Rust生态圈江湖气更盛。

smol 的目标是提供一个轻量级、快速、灵活的异步运行时,它具有以下特点:

  • 小巧精悍:smol 的代码库非常小,易于理解和维护。这使得开发者可以更好地控制和定制自己的异步环境。
  • 高性能:smol 注重性能,通过高效的调度器和底层实现,尽可能地减少开销。
  • 灵活性:smol不像 async-std 那样提供大量的内置功能,而是与 futures-rs 紧密结合,允许开发者根据需要选择和组合其他库。它也兼容 tokioasync-std,通过 async-compat 适配器,可以在 smol 中运行基于其他运行时编写的代码。
  • 强调简洁和控制:smol 的设计哲学是尽可能地减少默认约定,让开发者对异步行为有更多的控制权。

smol提供了下面各个模块:

我们重点了解一下它是如何运行异步代码的。因为异步生态圈几乎被 tokio 霸占了,再加上 async_std 也有一部分人在使用,虽然 smol 小巧可爱,也是大神在深度思考另起炉灶开发的一个异步运行时,使用的人还是相对较少,我也是在测试和技术验证的时候才使用它,所以我们不去介绍它的各个模块的细节了,暂时先了解它的基本运行异步任务就好。下面是一个简单的代码,其中的 spawnblock_on 我们已经很熟悉了。

fn main() {
    let task = smol::spawn(async {
        1 + 2
    });

    smol::block_on(async {
        assert_eq!(task.await, 3);
    });
}

spawn 将一个任务派生到全局执行器(默认情况下是单线程的)。这里有一个全局执行器,它会在首次使用时惰性初始化。在编写单元测试和小型程序时,为了方便起见,此库包含了它,但在其他情况下,更建议创建你自己的 Executor。默认情况下,全局执行器由单个后台线程运行,但你也可以通过设置 SMOL_THREADS 环境变量来配置线程数。

unblock 在一个线程池中执行阻塞代码:

use std::io::Read;
use std::fs::File;

async fn read_file(path: &str) -> std::io::Result<String> {
    let path = path.to_string();
    smol::unblock(move || {
        let mut file = File::open(path)?;
        let mut contents = String::new();
        file.read_to_string(&mut contents)?;
        Ok(contents)
    })
    .await
}

fn main() {
    smol::block_on(async {
        match read_file("Cargo.toml").await {
            Ok(contents) => println!("文件内容:{}", contents),
            Err(err) => eprintln!("读取文件出错:{}", err),
        }
    });
}

smol::block_on VS smol::unblock

另外还存在下面几个异步运行时:

  • embassy:一个面向嵌入式系统的异步运行时。
  • glommio:一个面向 I/O 密集型工作负载的异步运行时,构建于 io_uring 之上,并使用 thread-per-core 的模型。
  • 字节跳动 monoio:字节跳动服务框架组开源的基于 io-uringthread-per-core 模型异步运行时。

总结

好了,在这一节课中,我们重点介绍了另外两个异步运行时 async_stdsmol,也提到了其他一些有特色的异步运行时。

2024年有一篇文章 Rust异步现状:运行时,对当前的异步运行时进行了分析。如果我们观察async_std代码贡献活跃度,会发现自2021年后这个项目已经不活跃了,这也导致一些库弃用它而转向tokio。

而Tokio依然保持活跃:

smol还是有贡献者时不时地提交:

有多个异步运行时不一定是好事,这导致我们在编码的时候难以抉择。对于库的开发者来说,他们不得不选择其中一个异步运行时来支持异步代码,否则支持所有的异步运行时就太占资源了,主要的原因是这些异步运行时不兼容,即使有兼容库也不是很完备。

就目前情形来说,Tokio可能是最好的选择了。

思考题

请你使用Tikio实现一个击鼓传花的游戏,四个任务,名称分别是东南西北,按照顺序依次在任务间传递💐,随机在某次停止传💐,程序退出。每个任务拿到💐后打印出自己的名称,输出结果如“东南西北东南西北东南”。

欢迎你把你实现的代码分享到留言区,我们一起交流讨论,如果你觉得这节课的内容对你有帮助的话,也欢迎你分享给其他朋友,我们下节课再见!