38 Asynchronous Processing What Is Future and What Is Its Relationship With Async Await

38 Asynchronous Processing: What is Future? What is the relationship between it and async_await? #

Hello, I am Chen Tian.

Through the previous few lectures, we have gained a relatively clear understanding of concurrent processing, especially the commonly used concurrency primitives. Concurrency primitives are means of synchronization between concurrent tasks, and what we will learn today, Future, as well as async/await that handles Future at a higher level, are means of producing and running concurrent tasks.

However, there are many ways to produce and run concurrent tasks, and async/await is just one of them. In a distributed system, concurrent tasks can run on a node of the system; on a given node, concurrent tasks can run across multiple processes; within a process, concurrent tasks can run across multiple threads; on certain thread(s), concurrent tasks can run on multiple coroutines such as Promise/Future/Goroutine/Erlang process.

Their granularity is shown in the diagram from large to small: - Concurrency Granularity

In previous courses, we heavily used threads as concurrency tools, and in the construction of a kv server, we also used Future as a stackless coroutine through async/await.

In fact, Rust’s Future is very similar to JavaScript’s Promise.

If you’re familiar with JavaScript, you should be familiar with the concept of Promise that we briefly covered in [02], which represents a value whose result can only be obtained at some future moment. A Promise generally exists in three states:

  1. Initial state, the Promise has not yet run;
  2. Waiting (pending) state, the Promise is running but not finished;
  3. Finished state, the Promise successfully resolves a value or fails to execute.

However, JavaScript’s Promise is similar to threads in that once created, they start executing. To await a Promise is just to “wait” for and get the resolved value; whereas Rust’s Future only starts executing after being actively awaited.

By now, you must have seen that when we talk about Future, we often mention async/await. Generally speaking, async defines a task that can be executed concurrently, while await triggers this task to execute concurrently. In most languages, including Rust, async/await are syntactic sugars, wrapping structures like Promise/Future for processing using state machines.

In this lecture, we’ll set aside the internal implementation to mainly discuss the basic concepts and usage methods of Future/async/await, and we’ll go into detail about their principles in the next lecture.

Why Do We Need Future? #

First, let’s talk about why concurrent structures like Future are needed.

Before the emergence of Future, our Rust code was all synchronous. That is to say, when you execute a function, the CPU processes every instruction in the function before returning. If there is an IO operation in the function, in fact, the operating system will suspend the thread corresponding to the function, placing it in a waiting queue until the IO operation is complete, before resuming the thread and continuing to execute from where it was suspended.

This model is very simple and intuitive, the code is executed line by line, developers don’t need to consider which operations will block and which will not, they just care about their business logic.

However, with the continuous development of CPU technology, the main contradiction of application software in the new century is no longer the lack of CPU computing power but the contradiction between the overly abundant CPU power and the slowly improving IO speed. If there are a lot of IO operations, your program is not doing a lot of computing most of the time, but is constantly waiting for IO.

Let’s take a look at an example (code):

use anyhow::Result;
use serde_yaml::Value;
use std::fs;

fn main() -> Result<()> {
    // Read Cargo.toml, IO operation 1
    let content1 = fs::read_to_string("./Cargo.toml")?;
    // Read Cargo.lock, IO operation 2
    let content2 = fs::read_to_string("./Cargo.lock")?;

    // Calculation
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    // Write to /tmp/Cargo.yml, IO operation 3
    fs::write("/tmp/Cargo.yml", &yaml1)?;
    // Write to /tmp/Cargo.lock, IO operation 4
    fs::write("/tmp/Cargo.lock", &yaml2)?;

    // Print
    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}

This code reads Cargo.toml and Cargo.lock and converts them to yaml, then writes them to /tmp respectively.

Although the logic of this code is not problematic, there is a big issue with performance. When reading Cargo.toml, the entire main thread is blocked until Cargo.toml is read before it can continue to read the next file to be processed. The entire main thread is only truly executing computational tasks during the time slice when it runs toml2yaml; before and after reading files, the CPU is idle. - Blocking Diagram

Of course, you can argue that we have to wait during file reading because the execution of the toml2yaml function depends on the result of reading the file. Well, yes, but there’s a lot of wasted CPU here: we read the first file before we start reading the second file. Is there any way to read both files at the same time? This way, the total waiting time is max(time_for_file1, time_for_file2), not time_for_file1 + time_for_file2.

This is not difficult, we can place file reading and writing operations in separate threads, for example (code):

use anyhow::{anyhow, Result};
use serde_yaml::Value;
use std::{
    fs,
    thread::{self, JoinHandle},
};

/// Wrap a JoinHandle to provide additional methods
struct MyJoinHandle<T>(JoinHandle<Result<T>>);

impl<T> MyJoinHandle<T> {
    /// Wait for the thread to finish (similar to await)
    pub fn thread_await(self) -> Result<T> {
        self.0.join().map_err(|_| anyhow!("failed"))?
    }
}

fn main() -> Result<()> {
    // Read Cargo.toml, IO operation 1
    let t1 = thread_read("./Cargo.toml");
    // Read Cargo.lock, IO operation 2
    let t2 = thread_read("./Cargo.lock");

    let content1 = t1.thread_await()?;
    let content2 = t2.thread_await()?;

    // Calculation
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    // Write to /tmp/Cargo.yml, IO operation 3
    let t3 = thread_write("/tmp/Cargo.yml", yaml1);
    // Write to /tmp/Cargo.lock, IO operation 4
    let t4 = thread_write("/tmp/Cargo.lock", yaml2);

    let yaml1 = t3.thread_await()?;
    let yaml2 = t4.thread_await()?;

    fs::write("/tmp/Cargo.yml", &yaml1)?;
    fs::write("/tmp/Cargo.lock", &yaml2)?;

    // Print
    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

fn thread_read(filename: &'static str) -> MyJoinHandle<String> {
    let handle = thread::spawn(move || {
        let s = fs::read_to_string(filename)?;
        Ok::<_, anyhow::Error>(s)
    });
    MyJoinHandle(handle)
}

fn thread_write(filename: &'static str, content: String) -> MyJoinHandle<String> {
    let handle = thread::spawn(move || {
        fs::write(filename, &content)?;
        Ok::<_, anyhow::Error>(content)
    });
    MyJoinHandle(handle)
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}

In this way, the process of reading or writing multiple files is executed concurrently, greatly shortening the waiting time.

But what if you want to read 100 files at the same time? Obviously, it’s not a good idea to create 100 threads for such a thing. In the operating system, the number of threads is limited, creating/blocking/waking up/destroying threads all involve a lot of actions, and each thread will be allocated a not small call stack, so from the perspective of CPU and memory, creating too many threads will greatly increase the system’s overhead.

In fact, most operating systems provide non-blocking interfaces for I/O operations, that is, you can issue a read command and handle error codes like EWOULDBLOCK by yourself to better handle multiple file I/O in the same thread instead of relying on the operating system to do it for you through scheduling.

However, this means that you need to define appropriate data structures to track the read of each file, do the corresponding scheduling in user-space, block the data structures that are waiting for I/O, let data structures that are not waiting for I/O have a chance to use the CPU, and when the I/O operation is over, resume the execution of the data structures waiting for I/O, etc. Such fine-grained operations can make the most of CPU resources. That’s the main purpose of concurrent structures like Future.

However, if it is handled this way, a lot needs to be done in user space, including handling event notification of I/O tasks, creating Future, and reasonably scheduling Future. Leaving all these things to developers is clearly unreasonable. Therefore, Rust provides corresponding handling means async/await: async for conveniently generating Future, await for triggering Future scheduling and execution.

Let’s see how the same task can be handled more efficiently with async/await (code):

use anyhow::Result;
use serde_yaml::Value;
use tokio::{fs, try_join};

#[tokio::main]
async fn main() -> Result<()> {
    // Read Cargo.toml, IO operation 1
    let f1 = fs::read_to_string("./Cargo.toml");
    // Read Cargo.lock, IO operation 2
    let f2 = fs::read_to_string("./Cargo.lock");
    let (content1, content2) = try_join!(f1, f2)?;

    // Calculation
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    // Write to /tmp/Cargo.yml, IO operation 3
    let f3 = fs::write("/tmp/Cargo.yml", &yaml1);
    // Write to /tmp/Cargo.lock, IO operation 4
    let f4 = fs::write("/tmp/Cargo.lock", &yaml2);
    try_join!(f3, f4)?;

    // Print
    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}

In this code, we used tokio::fs instead of std::fs and tokio::fs file operations will return a Future, which can then be joined with these Futures to get the results of their run. join/try_join is used to poll multiple Futures, it will process each Future in turn, switch to the next one if blocked, until all Futures generate results.

The waiting time for file reads is max(time_for_file1, time_for_file2), and the performance is almost the same as the thread version, but the resource consumption (mainly threads) is significantly less.

I suggest you compare these three versions of the code and write and run them to experience their processing logic. Notice that in the last async/await version, we cannot write the code like this:

// Read Cargo.toml, IO operation 1
let content1 = fs::read_to_string("./Cargo.toml").await?;
// Read Cargo.lock, IO operation 2
let content1 = fs::read_to_string("./Cargo.lock").await?;

If it is written like this, it will be no different from the first synchronous version, because await will run Future until it ends, so it is still reading Cargo.toml and then reading Cargo.lock, and we have not achieved concurrency.

In-depth Understanding #

Now that we understand the necessity of Future in software development, let’s take a closer look at Future/async/await.

During the process of writing the code above, have you noticed that the return value of an asynchronous function (async fn) is a strange structure impl Future: - Async fn structure

We know that the impl keyword is usually used to implement trait for a data structure, meaning the thing following impl is a trait, so obviously, Future is a trait and it also has an associated type Output.

Let’s take a look at the definition of Future:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

In addition to Output, it also has a poll() method, which returns Poll<Self::Output>. And Poll is an enum containing two states: Ready and Pending. Obviously, when a Future returns a Pending state, the work is not finished, but it cannot continue and needs to block for a while, waiting for an event to wake it up; when a Future returns a Ready state, the value corresponding to the Future has been obtained, and it’s time to return.

You see, such a simple data structure upholds the vast Rust asynchronous async/await ecosystem.

Back to the return value of async fn, obviously it is an impl Future, so if we have a normal function return impl Future, are its behavior and async fn consistent? Let’s write a simple experiment (code):

use futures::executor::block_on;
use std::future::Future;

#[tokio::main]
async fn main() {
    let name1 = "Tyr".to_string();
    let name2 = "Lindsey".to_string();

    say_hello1(&name1).await;
    say_hello2(&name2).await;

    // In addition to using await to execute Future, you can also directly use executor to execute
    block_on(say_hello1(&name1));
    block_on(say_hello2(&name2));
}

async fn say_hello1(name: &str) -> usize {
    println!("Hello {}", name);
    42
}

// The async fn keyword is equivalent to syntactic sugar for returning impl Future<Output>
fn say_hello2<'fut>(name: &'fut str) -> impl Future<Output = usize> + 'fut {
    async move {
        println!("Hello {}", name);
        42
    }
}

Running this code you will find that say_hello1 and say_hello2 are equivalent, both can be executed using await, and can also be provided to an executor for execution.

Here we encounter a new term: executor.

What is an executor? #

You can roughly think of an executor as a scheduler for Future. For a thread, the operating system is responsible for scheduling; but the operating system will not schedule user-space coroutines (such as Future), so any program that uses coroutines to handle concurrency needs to have an executor to be responsible for the scheduling of coroutines.

Many programming languages with built-in support for coroutines, such as Golang/Erlang, come with a user-space scheduler. Rust, although also providing coroutines such as Future, does not provide an executor at the language level, leaving developers the autonomy to use an executor and which executor to use. The benefit is that when my code does not require coroutines, there is no need to introduce any runtime; and when coroutines are needed, the most suitable executor for my application can be chosen from the ecosystem.

Common executors include:

  • The futures library’s own very simple executor, the above code uses its block_on function;
  • Tokio’s provider executor, when using #[tokio::main] is implicitly included tokio’s executor;
  • Executor provided by async-std, similar to tokio;
  • Async-executor provided by smol, mainly providing block_on.

Note that the above code we mixed use of #[tokio::main] and futures::executor::block_on, this is just to show different ways of using Future, in production code, it is not recommended to mix use different executors, it will reduce the program’s performance and may cause strange problems.

When we talk about executors, we have to mention reactors, they both are part of the Reactor Pattern, as a very typical pattern for building high-performance event-driven systems, Reactor pattern includes three parts:

  • Task, tasks to be processed. Tasks can be interrupted and control is given to the executor, waiting for later scheduling;
  • Executor, a scheduler. It maintains the tasks waiting to run (ready queue) and blocked tasks (wait queue);
  • Reactor, maintains the event queue. When the event arrives, it notifies the executor to wake up a task waiting to run.

The executor schedules and executes the tasks to be processed. When the task cannot continue yet it is not complete, it will suspend the task and set the appropriate wake-up conditions. If the reactor receives events that satisfy the conditions, it will wake up the previous suspended tasks, then the executor has a chance to continue executing this task. This loop continues until the task is completed.

How to use Future for asynchronous processing? #

After understanding the Reactor pattern, the entire structure