17 躬行实践,洞悉其理:自己动手实现一个Future
你好,我是鸟窝。
上几节课我们学习了Rust的异步编程和几种常见的异步运行时,以及异步编程应用的场景以及陷阱,如果你觉得还不过瘾,那么这节课就让我们来学习如何实现一个自己的Future。
我们可以通过ready创建一个无需等待的Future,也可以通过pending创建一个永远不会完成的Future。这两种方式是最极端的两个场景,更多的情况下,如果我们是一个库的实现者,我们可能要实现自己的异步库,实现自己的Future。就像tokio、async_std实现的各种异步io、net库一样。
一个简单的Delay future
下面我将尝试解释如何在 Rust 中手动实现一个 Future,并解释其背后的原理。
Future 的核心概念
Future 代表一个异步操作的最终结果。它有两种状态:
- Pending(挂起):操作尚未完成。
- Ready(就绪):操作已完成,结果可用。
Future 的主要方法是 poll(),它被调用来检查 Future 是否已经完成。poll() 方法返回一个 Poll 枚举。
简化版 Future 实现
下面是一个简单的 Future 实现,它模拟一个延迟操作:
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
struct Delay {
duration: Duration,
completed: bool,
}
impl Delay {
fn new(duration: Duration) -> Self {
Delay {
duration,
completed: false,
}
}
}
impl Future for Delay {
type Output = String;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
Poll::Ready("延迟完成!".to_string())
} else {
println!("子曰:三思而后行!");
// 模拟耗时操作,这里使用线程休眠
std::thread::sleep(self.duration);
self.completed = true;
cx.waker().wake_by_ref(); // 非常重要!唤醒执行器
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let delay = Delay::new(Duration::from_secs(2));
let result = delay.await;
println!("{}", result);
}
Delay结构体:存储延迟时间和完成状态。impl Future for Delay:为Delay实现了Futuretrait。-
poll()方法:这是 Future 的核心。 -
如果
self.completed为true,则返回Poll::Ready,表示 Future 已完成。 - 否则,模拟一个耗时操作(这里使用
thread::sleep()),然后将self.completed设置为true。 - 重点:
cx.waker().wake_by_ref();这行代码非常重要。它获取一个Waker,并调用wake_by_ref()方法。Waker的作用是通知执行器(Executor)Future 已经准备好再次被poll()。如果没有这一步,执行器就不知道 Future 已经完成了模拟的耗时操作,Future 就永远不会完成。 main()函数:使用tokio运行main()Future。
改造 Delay
但是,这个Future的实现有一个问题,在Future第一次被poll的时候,调用了 std::thread::sleep,这个导致调用线程阻塞,这正是上一课我们介绍的陷阱之一。
我们可以借助tokio实现的sleep改造一下,如下:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{sleep, Sleep};
use pin_project::pin_project;
#[pin_project] // 使用 pin_project 宏
struct Delay {
#[pin] // 标记需要 Pin 住的字段
sleep: Sleep,
message: String,
}
impl Delay {
fn new(duration: Duration, message: String) -> Self {
Delay {
sleep: sleep(duration),
message,
}
}
}
impl Future for Delay {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); // 使用 project() 方法进行投影
match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(this.message.clone()),
}
}
}
#[tokio::main]
async fn main() {
let delay1 = Delay::new(Duration::from_secs(2), "第一个延迟完成!".to_string());
let delay2 = Delay::new(Duration::from_secs(1), "第二个延迟完成!".to_string());
tokio::join!(async { // 并发执行两个 Future
let result1 = delay1.await;
println!("{}", result1);
}, async {
let result2 = delay2.await;
println!("{}", result2);
});
println!("所有延迟完成!");
}
Delay结构体的 sleep: Sleep 是关键!我们不再使用 thread::sleep(),而是使用 tokio::time::sleep() 创建一个 Sleep 类型的 Future。Sleep Future 代表一个非阻塞的延迟操作。它由 tokio 运行时管理。message: String 是存储延迟完成后要返回的消息。
添加 pin_project crate:
- 在
Cargo.toml文件中添加pin-project = "1"依赖。 #[pin_project]宏:在Delay结构体上添加#[pin_project]宏。这个宏会自动生成必要的Pin投影代码。#[pin]属性:在Delay结构体中需要 Pin 住的字段sleep上添加#[pin]属性。self.project()方法:在poll方法中使用self.project()方法进行投影。这个方法是由pin_project宏自动生成的,它会返回一个DelayProjection类型的结构体,其中包含了对 Pin 主字段的安全访问方式。
Delay::new() 构造函数调用 tokio::time::sleep() 创建一个 Sleep Future。这个函数不会阻塞当前线程,而是向 tokio 运行时注册一个定时器。当指定的时间 duration 到达时,tokio 运行时会唤醒与此 Sleep Future 关联的任务。
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>:这是 Future trait 的核心方法。执行器会不断地调用这个方法来检查 Future 是否已经完成。
Pin<&mut Self>:由于 tokio::time::Sleep 的 poll 方法需要一个 Pin<&mut Sleep> 类型的参数,所以 Delay 的 poll 方法也必须接收一个 Pin<&mut Self> 类型的参数。Pin 用于防止 Future 在执行过程中被移动到内存的其他位置,这对于某些需要固定内存地址的 Future 非常重要。
match sleep.poll(cx):这是 poll 方法的核心。我们调用内部 Sleep Future 的 poll 方法。 这里使用了 pin_project,所以访问 sleep 字段就很方便了。
cx: &mut Context<'_>:Context 包含一个 Waker,用于在 Future 准备好再次被 poll 时唤醒执行器。在这个例子中,tokio 运行时会自动处理 Waker 的唤醒,我们不需要手动调用。
通过这个修改,我们借助 tokio::time::sleep 的能力,实现了无阻塞的Delay future。
Timeout future的实现
接下来我介绍一个 Timeout future的实现。这个 Timeout 会等待一段时间,如果这段事件正经的future还没有完成,就导致超时返回。
这段代码展示了如何实现一个自定义的 Timeout 结构体,用于在指定的超时时间内执行一个异步任务。如果任务在超时时间内没有完成,则返回一个超时错误:
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project::pin_project;
#[pin_project]
pub struct Timeout<F, D> {
#[pin]
future: F,
#[pin]
deadline: D,
completed: bool,
}
impl<F, D> Timeout<F, D> {
pub fn new(future: F, deadline: D) -> Self {
Self {
future,
deadline,
completed: false,
}
}
}
impl<F: Future, D: Future> Future for Timeout<F, D> {
type Output = io::Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
assert!(!*this.completed, "future polled after completing");
match this.future.poll(cx) {
Poll::Ready(v) => {
*this.completed = true;
Poll::Ready(Ok(v))
}
Poll::Pending => match this.deadline.poll(cx) {
Poll::Ready(_) => {
*this.completed = true;
Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out")))
}
Poll::Pending => Poll::Pending,
},
}
}
}
#[tokio::main]
async fn main() {
// 创建一个延迟 2 秒的 Future
let delay = tokio::time::sleep(std::time::Duration::from_secs(2));
// 创建一个超时时间为 1 秒的 Future
let timeout = tokio::time::sleep(std::time::Duration::from_secs(1));
// 创建一个超时 Future,因为timeout 1s就到了,所以delay future还没来得及返回,超时就返回了
let timeout_future = Timeout::new(delay, timeout);
match timeout_future.await {
Ok(_) => println!("Future 完成!"),
Err(e) => eprintln!("Future 超时!错误:{}", e),
}
}
Timeout 结构体包含三个字段:
- future:要执行的异步任务。
- deadline:超时时间。deadline其实是一个future,这也意味着它不仅仅可以是时间,也可以是另外一个future。
- completed:标记任务是否已完成。
实现 Future trait 使 Timeout 结构体成为一个异步任务。poll 方法检查 future 和 deadline 的状态:如果 future 完成,则返回结果;如果 deadline 超时,则返回超时错误;否则,继续等待。
futures 库
futures-rs 是 Rust 异步编程生态系统中一个重要的库(现在已经纳入到官方的仓库之下),它提供了一系列用于处理 Future、Stream 和其他异步操作的工具和抽象。futures-rs 中定义的 future 类型是标准库中 future 的原始实现。虽然 std::future 模块在 Rust 标准库中已经提供了基本的 Future trait 和 async/.await 语法,但 futures-rs 仍然提供了许多有用的扩展功能,尤其是在处理复杂的异步场景时。
Future 扩展
组合器(Combinators):futures-rs 提供了丰富的 Future 组合器,例如 map、and_then、or_else、join、select 等,用于方便地组合、转换和控制 Future。这些组合器使得编写复杂的异步流程变得更加简洁和易读。

实用函数:提供了一些方便的函数,例如 ready 用于创建一个立即完成的 Future,pending 用于创建一个永远不会完成的 Future。
FutureExt/TryFutureExt trait:通过 use futures::prelude::*; 引入的 FutureExt /TryFutureExt trait 为 Future 类型添加了许多方便的方法,例如 map、then、catch_unwind 等。
示例:
use futures::future::{FutureExt, TryFutureExt, ready};
use tokio::runtime::Runtime;
async fn process_data(input: i32) -> Result<String, String> {
// 使用 map 将 i32 转换为 String
let string_future = ready(Ok(input)).map(|result| result.map(|n| format!("Number: {}", n)));
// 使用 and_then 处理 Result,并在成功时返回另一个 Future
string_future
.and_then(|s| async move {
Ok(format!("Processed: {}", s))
})
.await
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let result1 = process_data(10).await;
println!("Result 1: {:?}", result1); // 输出:Ok("Processed: Number: 10")
let result2 = process_data(-1).await; // 假设 -1 是一个错误输入
println!("Result 2: {:?}", result2); // 输出:Ok("Processed: Number: -1")
});
}
这个例子中 ready(Ok(input)) 创建一个立即完成的 Future,其结果为 Ok(input)。map 将 Future 的结果从 Result<i32, String> 映射为 Result<String, String>。
and_then 接收一个闭包,该闭包接收前一个 Future 的结果,并返回一个新的 Future。这使得我们可以根据前一个 Future 的结果执行不同的异步操作。
Stream 支持
futures-rs 定义了 Stream trait,用于表示异步产生的一系列值。Stream 类似于迭代器,但它是异步的。
Stream 组合器:提供了各种 Stream 组合器,例如 map、filter、fold、for_each、buffer 等,用于处理和转换异步数据流。

Stream 实用函数:提供了创建 Stream 的函数,例如 stream::iter 用于从迭代器创建 Stream。
示例:
use futures::stream::{self, StreamExt};
use tokio::runtime::Runtime;
async fn process_stream() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
stream.for_each(|x| async move {
println!("Processing: {}", x);
// 这里可以进行一些异步操作,例如网络请求
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}).await;
println!("Stream processing complete.");
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
process_stream().await;
});
}
在这个例子中,stream::iter 从一个 Vec 创建一个 Stream。for_each 对 Stream 中的每个元素执行一个异步闭包。tokio::time::sleep 模拟一个异步操作。
Sink 支持
futures-rs 定义了 Sink trait,用于支持异步数据的写入。Sink 定义了 start_send、poll_ready 和 poll_flush 等方法,用于异步发送数据。
示例:
use futures::channel::mpsc;
use futures::prelude::*;
use tokio::runtime::Runtime;
async fn send_data(mut tx: mpsc::Sender<i32>) {
for i in 1..=5 {
tx.send(i).await.unwrap();
println!("Sent: {}", i);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
async fn receive_data(mut rx: mpsc::Receiver<i32>) {
while let Some(item) = rx.next().await {
println!("Received: {}", item);
}
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let (tx, rx) = mpsc::channel(10); // 创建一个容量为 10 的通道
tokio::join!(send_data(tx), receive_data(rx));
});
}
在这个例子中,mpsc::channel 创建一个多生产者单消费者(multi-producer single-consumer)通道。tx.send(i).await 异步发送数据到通道。rx.next().await 异步接收通道中的数据。tokio::join! 同时运行发送和接收任务。
执行器(Executor)
所有异步计算都发生在执行器内部,执行器能够将 Future 派生成任务。本模块提供了一些内置执行器,以及用于构建自定义执行器的工具。
示例:
use futures::executor::block_on;
use futures::future::ready;
async fn my_async_function() -> i32 {
let future1 = ready(10); // 创建一个立即完成的 Future
let future2 = async { 20 }; // 创建一个异步 Future
let result1 = future1.await; // 等待 future1 完成
let result2 = future2.await; // 等待 future2 完成
result1 + result2
}
fn main() {
let result = block_on(my_async_function()); // 执行异步函数
println!("Result: {}", result); // 输出:Result: 30
}
在这个例子中,async fn my_async_function() 定义了一个异步函数,它返回一个 Future。ready(10) 创建一个立即完成的 Future,而 async { 20 } 创建一个异步 Future。.await 用于等待这些 Future 完成并获取结果。block_on 用于在当前线程上执行异步函数。
宏
这个库提供了好几个宏,对于我们处理异步编程中的各种场景非常有帮助,所以这里我介绍它的几个常用的宏。
join!
功能:同时轮询多个 Future,并在所有 Future 都完成后返回一个包含所有结果的元组。
返回值:如果所有 Future 都成功完成,则返回 (T1, T2, ..., Tn),其中 Ti 是第 i 个 Future 的结果类型。如果其中任何一个 Future 返回错误,则整个 join! 操作会立即返回该错误。
示例:
use futures::join;
use tokio::runtime::Runtime;
async fn fetch_data(url: &str) -> Result<String, String> {
// 模拟网络请求
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let future1 = fetch_data("url1");
let future2 = fetch_data("url2");
let (result1, result2) = join!(future1, future2);
println!("Result 1: {:?}", result1);
println!("Result 2: {:?}", result2);
});
}
等待两个future都完成。
try_join!
功能:同时轮询多个 futures,并解析为一个 Result,其中包含成功输出的元组或一个错误。
返回值:所有future都完成或者第一个失败的结果。
try_join! 类似于 join!,但如果任何一个 future 返回错误,它会立即完成。
示例(所有的任务都成功完成):
use futures::try_join;
let a = async { Ok::<i32, i32>(1) };
let b = async { Ok::<i32, i32>(2) };
assert_eq!(try_join!(a, b), Ok((1, 2)));
// `try_join!` 是可变参数的,因此您可以传递任意数量的 futures
let c = async { Ok::<i32, i32>(3) };
let d = async { Ok::<i32, i32>(4) };
let e = async { Ok::<i32, i32>(5) };
assert_eq!(try_join!(c, d, e), Ok((3, 4, 5)));
示例(一个任务失败):
use futures::try_join;
let a = async { Ok::<i32, i32>(1) };
let b = async { Err::<u64, i32>(2) };
assert_eq!(try_join!(a, b), Err(2));
pending!
功能:等价于 futures_core::task::Poll,相当于消耗一次Poll。
返回值:Poll::Pending。
示例:
use futures::{pending, select, FutureExt};
async fn pending_function() -> i32 {
pending!();
42
}
async fn ready_function() -> i32 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
48
}
#[tokio::main]
async fn main() {
select! {
v = pending_function().fuse() => {
println!("pending_function completed: {}",v);
},
v = ready_function().fuse() => {
println!("ready_function completed: {}",v);
}
}
}
pending! 宏在Poll的时候还没有ready,再次Poll的时候就是Ready的状态了,所以打印出42。
这个宏文档不是很详细,而且也没有例子,有点难以理解,如果看代码就很清晰了,实际上它生成了一个PendingOnce对象并等待它完成:
#[macro_export]
macro_rules! pending {
() => {
$crate::__private::async_await::pending_once().await
};
}
#[doc(hidden)]
pub fn pending_once() -> PendingOnce {
PendingOnce { is_ready: false }
}
#[allow(missing_debug_implementations)]
#[doc(hidden)]
pub struct PendingOnce {
is_ready: bool,
}
impl Future for PendingOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_ready {
Poll::Ready(())
} else {
self.is_ready = true;
Poll::Pending
}
}
}
pin_mut!
功能:在栈上创建一个可变的 Pin。Pin 用于确保某个值在内存中的位置不会移动,这对于某些需要自引用的类型(例如 Future)非常重要。
用法:pin_mut!(variable) 将 variable 绑定为一个 Pin<&mut Type>。
示例:
use futures::pin_mut;
use std::pin::Pin;
fn main() {
let value = 5;
pin_mut!(value); // value 现在是 Pin<&mut i32>
let mut pinned_value: Pin<&mut i32> = value;
*pinned_value = 10;
println!("{}", *pinned_value); // 输出 10
}
例子中将value值变为Pin,确保value值不移动。
poll!
功能:在当前的 async 上下文中轮询一个 Future 一次。
返回值:Poll<T>,表示 Future 的状态(Pending 或 Ready(T))。
用法:通常在手动实现 Future 时使用。
示例(简化):
use std::task::Poll;
use futures::*;
async fn some_async_function() -> Result<&'static str, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok("Async function completed")
}
#[tokio::main]
async fn main() {
let mut future = Box::pin(some_async_function());
let poll_result = poll!(&mut future); // 这一行轮询 future
// 检查轮询结果
match poll_result {
Poll::Ready(value) => {
// future 成功完成,处理该值
println!("Future resolved with: {:?}", value);
}
Poll::Pending => {
// future 尚未准备好,处理这种情况(例如,yield)
println!("Future is not ready yet");
}
}
// 等待足够的时间,重新轮询 future
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let poll_result = poll!(&mut future);
match poll_result {
Poll::Ready(value) => {
println!("Future resolved with: {:?}", value);
}
Poll::Pending => {
println!("Future is not ready yet");
}
}
}
上面的代码演示了如何使用 futures 库中的 poll! 宏手动轮询 future。其核心逻辑在于如何使用 poll! 宏以及如何根据 Poll 的返回值进行处理。
poll! 宏用于轮询 future。它接受一个 &mut Future 类型的参数(这里是 &mut future),并返回一个 Poll 类型的值。
- 第一次调用
poll!:由于some_async_function()需要 100 毫秒才能完成,因此第一次调用poll!时,future 尚未完成,poll!宏会返回Poll::Pending。代码中的match语句会匹配到Poll::Pending分支,并打印 “Future is not ready yet”。 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;:这行代码使程序休眠 1000 毫秒,远大于some_async_function()所需的 100 毫秒。这意味着在第二次调用poll!之前,future 肯定已经完成(不考虑几乎不会出现的极端情况)。- 第二次调用
poll!:由于 future 已经完成,第二次调用poll!时,宏会返回Poll::Ready(Ok("Async function completed"))。代码中的match语句会匹配到Poll::Ready分支,并打印 “Future resolved with: Ok(“Async function completed”)”。
poll! 宏是手动驱动 future 执行的关键。它模拟了 executor 的行为,即不断地轮询 future,直到 future 完成。poll! 宏的返回值 Poll 有两个变体:
Poll::Pending:表示 future 尚未完成,需要稍后再次轮询。Poll::Ready(value):表示 future 已经完成,value是 future 的返回值。
select!
功能:用于同时轮询(poll)多个 futures 和 streams,并执行第一个完成的 futures 的分支。如果有多个 futures 准备就绪,则将在运行时伪随机选择一个。
返回值:future的执行结果。
直接传递给 select! 的 futures 必须实现 Unpin 和 FusedFuture trait。
如果向 select! 传递的是返回 Future 的表达式(例如异步函数调用),而不是按名称命名的 Future,则会放宽 Unpin 要求,因为宏将把生成的 Future 固定在栈上。但是表达式返回的 Future 仍然必须实现 FusedFuture trait。
可以使用 .fuse() 方法融合尚未融合的 futures 和 streams。但是,需要注意的是,如果在循环中使用 select!,则直接在调用中融合 future 或 stream 将不足以防止其在完成之后再次被轮询,因此在循环中使用 select! 时,用户应该格外小心地在循环之外进行融合。
select! 可以用作表达式,并返回所选分支的返回值。因此,select! 中每个分支的返回类型必须相同。
此宏只能在 async 函数、闭包和块内使用。它也依赖于库的 async-await 功能,该功能默认情况下处于激活状态。
示例:
use futures::future::FutureExt;
use futures::select;
async fn async_identity_fn(arg: usize) -> usize {
arg
}
#[tokio::main]
async fn main() {
let res = select! {
a_res = async_identity_fn(62).fuse() => a_res + 1,
b_res = async_identity_fn(13).fuse() => b_res,
};
println!("Result: {}", res);
}
随机匹配到一个分支上。
futures 库也提供另一个宏 select_biased!,用于同时轮询(poll)多个 futures 和 streams,与 select! 宏不同的是,如果有多个 futures 准备就绪,则将按照声明的顺序选择一个。
stream_select
功能:组合多个具有相同 Item 类型的所有流(stream)变成一个。
返回值:返回流的元素。
它类似于 select 宏,但是并不强制要求所有流都具有相同的类型。stream_select 宏还可以保持流在线,并且不需要分配 Box<dyn Stream>。传递给此宏的流必须实现 Unpin trait。
示例:
use futures::{StreamExt, stream, stream_select};
#[tokio::main]
async fn main() {
let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse();
let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
match endless_numbers.next().await {
Some(1) => println!("Got a 1"),
Some(2) => println!("Got a 2"),
Some(3) => println!("Got a 3"),
_ => unreachable!(),
}
}
这个例子组合了三种流。
总结
好了,这一节课分为两个部分。第一部分给你介绍了如何定制自己的Future,通过DelayFuture和TimeoutFuture给你介绍了自定义的Future实现。第二部分我们补充了futures库的介绍。futures库是非常流行的异步编程库,在async/.await还没有实现的时候它就开始提供异步的能力了。它提供了丰富的异步编程的扩展,尤其是那些便利的宏。
思考题
请使用异步并发的方式访问各大搜索引擎的网站,并使用 try_join 宏打印出成功与否。欢迎你在留言区分享你动手实践的结果,我们一起交流讨论,如果你觉得这节课的内容对你有帮助的话,也欢迎你分享给其他朋友,我们下节课再见!