【译】Async/Await(二)——Futures

原文标题:Async/Await
原文链接:https://os.phil-opp.com/async-await/#multitasking
公众号: Rust 碎碎念
翻译 by: Praying

Rust 中的 Async/Await

Rust 语言以 async/await 的形式对协作式多任务提供了最好的支持。在我们探讨 async/await 是什么以及它是怎样工作的之前,我们需要理解 future 和异步编程在 Rust 中是如何工作的。

Futures

future 表示一个可能还无法获取到的值。例如由另一个任务计算的整数或者从网络上下载的文件。future 不需要一直等待,直到值变为可用,而是可以继续执行直到需要这个值的时候。

示例

下面这个例子可以很好的阐述 future 的概念:

【译】Async/Await(二)——Futures

在这个时序图里,main函数从文件系统读取一个文件,然后调用函数foo。这个过程重复了两次:一次是调用同步的read_file,另一次是调用异步的async_read_file

在同步调用的情况下,main需要等待文件从文件系统载入。然后它才可以调用foo函数,foo又需要再次等待结果。

在调用异步的async_read_file的情况下,文件系统直接返回一个 future 并且在后台异步地载入文件。这使得main函数得以更加容易地调用foofoo与文件载入并行运行。在这个例子中,文件载入在foo返回之前就完成载入,所以main可以直接对文件操作而不必等待foo返回。

Rust 中的 Futures

在 Rust 中,future 通过Future[1] trait 来表示,它看起来像下面这样:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

关联类型[2]Output指定了异步的值的类型。例如,上图中的async_read_file函数将会返回一个Future实例,其中Output类型被设置为File

poll[3]能够检查是否值已经可用。它返回一个Poll枚举,看起来像下面这样:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

当这个值可用时(例如,文件已经从磁盘上被完整地读取),该值会被包装在Ready变量中然后被返回。否则,会返回一个Pending变量,告诉调用者这个值目前还不可用。

poll方法接收两个参数:self: Pin<&mut Self>cx: &mut Context。前者类似于一个普通的&mut self引用,不同的地方在于Self值被pinned[4]到它的内存位置。如果不理解 async/await 是如何工作的,就很难理解Pin以及为什么需要它。因此,我们稍后再来解释这个问题。

cx: &mut Context参数的目的是把一个Waker实例传递给异步任务,例如从文件系统载入文件。Waker允许异步任务发送通知表示任务(或任务的一部分)已经完成,例如文件已经从磁盘上载入。因为主任务知道当Future就绪的时候自己会被提醒,所以它不需要一次又一次地调用poll。在本文后面当我们实现自己的 Waker 类型时,我们将会更加详细地解释这个过程。

使用 Future(Working with Futures)

现在我们知道 future 是如何被定义的并且理解了poll方法背后的基本思想。尽管如此,我们仍然不知道如何使用 future 来高效地工作。问题在于 future 表示异步任务的结果,而这个结果可能是不可用的。尽管如此,在实际中,我们经常需要这些值直接用于后面的计算。所以,问题是:我们怎样在我们需要时能够高效地取回一个 future 的值?

等待 Future

一个答案是等待 future 就绪。看起来类似下面这样:

let future = async_read_file("foo.txt");
let file_content = loop {
    match future.poll(…) {
        Poll::Ready(value) => break value,
        Poll::Pending => {}, // do nothing
    }
}

在这段代码里,我们通过在循环里一次又一次地调用poll来等待 future。这里poll的参数无关紧要,所以我们将其忽略。虽然这个方案能够工作,但是它非常低效,因为在该值可用之前 CPU 一直处于忙等待状态。

一个更加高效的方式是阻塞当前的线程直到 future 变为可用。当然这是在你有线程的情况下才有可能,所以这个解决方案对于我们的内核来讲不起作用,至少目前还不行。即使是在支持阻塞的系统上,这通常也是不希望发生的,因为它又一次地把一个异步任务转为了一个同步任务,从而抑制了并行任务潜在的性能优势。

Future 组合子(Future Combinators)

等待的一个替换选项是使用 future 组合子。Future 组合子是类似map的方法,它们能够将 future 进行链接和组合,和Iterator上的方法比较相似。这些组合子不是在 future 上等待,而是自己返回一个 future,这个 future 在poll上进行了映射操作。

举个例子,一个简单的string_len组合子,用于把Future<Output = String>转换为Future<Output = usize>,可能看起来像下面这样:

struct StringLen<F> {
    inner_future: F,
}

impl<F> Future for StringLen<F> where F: Future<Output = String> {
    type Output = usize;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        match self.inner_future.poll(cx) {
            Poll::Ready(s) => Poll::Ready(s.len()),
            Poll::Pending => Poll::Pending,
        }
    }
}

fn string_len(string: impl Future<Output = String>)
    -> impl Future<Output = usize>
{
    StringLen {
        inner_future: string,
    }
}

// Usage
fn file_len() -> impl Future<Output = usize> {
    let file_content_future = async_read_file("foo.txt");
    string_len(file_content_future)
}

这段代码不怎么有效,因为它没有处理pinning[5],但是这里它作为一个例子已经足够了。基本的思想是,string_len函数把一个给定的Future实例包装进一个新的StringLen结构体,该结构体也实现了Future。当被包装的 Future 被轮询(poll)时,它轮询内部的 future。如果这个值尚未就绪,被包装的 future 也会返回Poll::Pending。如果这个值就绪,字符串会从Poll::Ready变量中导出并且它的长度会被计算出来。之后,它会再次被包装进Poll::Ready然后返回。

通过string_len函数,我们可以在不必等待的情况下异步地计算一个字符串的长度。因为这个函数会再次返回一个Future,所以调用者无法直接在返回值上操作,而是需要再次使用组合子函数。通过这种方式,整个调用图就变成了异步的,并且我们可以在某个时间点高效地同时等待多个 future,例如在 main 函数中。

手动编写组合子函数是困难的,因此它们通常由库来提供。然而 Rust 标准库本身没有提供组合子方法,但是半官方的(兼容no_stdfuture crate 提供了。它的FutureExt trait 提供了高级别的组合子方法,像map或者then,这些组合子方法可以被用于操作带有任意闭包的结果。

优势

Future 组合子的最大优势在于,它们保持了操作的异步性。通过结合异步 I/O 接口,这种方式可以得到很高的性能。事实上,future 组合子实现为带有 trait 实现的普通结构体,这使得编译器能够对它们进行极度优化。如果想了解更多的细节,可以阅读Zero-cost futures in Rust[6]这篇文章,该文宣布了 future 加入了 Rust 生态系统。

缺点

尽管 future 组合子能够让我们写出非常高效的代码,但是在某些情况下由于类型系统和基于闭包的接口,使用它们也很困难。例如,考虑下面的代码:

fn example(min_len: usize) -> impl Future<Output = String> {
    async_read_file("foo.txt").then(move |content| {
        if content.len() < min_len {
            Either::Left(async_read_file("bar.txt").map(|s| content + &s))
        } else {
            Either::Right(future::ready(content))
        }
    })
}

在 playground 上尝试运行这段代码[7]

在这里,我们读取文件foo.txt,接着使用then组合子基于文件内容链接第二个 future。如果内容长度小于给定的min_len,我们读取另一个文件bar.txt然后使用map组合子将其追加到content中。否则,我们就仅返回foo.txt的内容。

我们需要对传入then里的闭包使用move关键字,因为如果不这样做,将会出现一个关于min_len的生命中周期错误。使用Either包装器(wrapper)的原因是 if 和 else 语句块必须拥有相同的类型。因为我们在块中返回不同的 future 类型,所以我们必须使用包装器类型来把它们统一到相同类型。ready函数把一个值包装进一个立即就绪的 future。需要这个函数是因为Either包装器期望被包装的值实现了Future

正如你所想,对于较大的项目,这样写很快就能产生非常复杂的代码。如果涉及到借用和不同的生命周期,它会变得更为复杂。为此,我们投入了大量的工作来为 Rust 添加对 async/await 的支持,就是为了让异步代码的编写从根本上变得更加简单。

【译】Async/Await(二)——Futures

参考资料

[1]

Future: https://doc.rust-lang.org/nightly/core/future/trait.Future.html


[2]

关联类型: https://doc.rust-lang.org/book/ch19-03-advanced-traits.html#specifying-placeholder-types-in-trait-definitions-with-associated-types


[3]

poll: https://doc.rust-lang.org/nightly/core/future/trait.Future.html#tymethod.poll


[4]

pinned: https://doc.rust-lang.org/nightly/core/pin/index.html


[5]

pinning: https://doc.rust-lang.org/stable/core/pin/index.html


[6]

Zero-cost futures in Rust: https://aturon.github.io/blog/2016/08/11/futures/


[7]

在 playground 上尝试运行这段代码: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=91fc09024eecb2448a85a7ef6a97b8d8


发表评论

评论已关闭。

相关文章