39 Asynchronous Processing How Is Async Await Implemented Internally

39 Asynchronous Processing: How is async_await Implemented Internally? #

Hello, I am Chen Tian.

After finishing the last lesson, we now have a fairly solid understanding of the basic concepts of Future and async/await. We know in which cases we should use Future, in which to use Thread, and how executor and reactor interact to finally produce a result for Future.

However, we don’t understand why async fn or async block can generate Future, nor do we understand how Future is processed by the executor. Today, we will delve deeper to see what kind of magic the two keywords, async/await, have performed to make everything run so simply and so naturally.

Let me start by saying that we will continue to discuss some principles around the seemingly simple but complicated interface of Future, mainly focusing on the Context and Pin structures:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Even if you don’t fully understand the content of this lesson, it won’t affect your use of async/await. If your energy is limited, you don’t need to understand all the details, just grasp the reasons for these problems and the approach to solving them.

The Calling Mechanism of Waker #

First, let’s see what this Context interface is all about.

In the last lesson, we briefly mentioned that the executor calls the poll method to continue the execution of Future. If the poll method returns Poll::Pending, it blocks the Future until the reactor receives an event and then calls Waker.wake() to wake up the Future. But where does this Waker come from?

Actually, it is implied in the Context:

pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

So, Context is a wrapper of Waker.

If you examine the definition and related code of Waker, you’ll find it very abstract. Internally, it uses a vtable to allow the behavior of various wakers:

pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

We’ve seen such manual generation of a vtable before when we read the source code of bytes; it allows the greatest degree of efficiency and flexibility.

Rust itself does not provide an asynchronous runtime. It only defines some basic interfaces in the standard library. How to implement them is decided by various runtimes (such as tokio). Thus, in the standard library, you will only see the definitions of these interfaces and the implementation of “high-level” interfaces. For example, the wake method under Waker simply calls wake() from inside the vtable:

impl Waker {
    /// Wake up the task associated with this `Waker`.
    #[inline]
    pub fn wake(self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.
        let wake = self.waker.vtable.wake;
        let data = self.waker.data;

        // Don't call `drop` -- the waker will be consumed by `wake`.
        crate::mem::forget(self);

        // SAFETY: This is safe because `Waker::from_raw` is the only way
        // to initialize `wake` and `data` requiring the user to acknowledge
        // that the contract of `RawWaker` is upheld.
        unsafe { (wake)(data) };
    }
    ...
}

If you want to trace how the vtable is set, but find that all clues have vanished silently, it’s because the specific implementation is not in the standard library, but in third-party asynchronous runtimes, such as tokio.

However, although we use tokio in development, I recommend looking at the futures crate to read and understand the code, which is very suitable for further understanding the principles behind executors through the code. The futures crate also has a simple executor, which is also very suitable for understanding the principles.

What Exactly is Generated by async? #

Next, let’s take a look at Pin. This is a peculiar data structure, as normal structures use self/&self/&mut self methods, but poll() uses Pin<&mut self>. Why?

To explain Pin, we need to take a step back and see what kind of code an async block/fn that produces Future actually generates. Let’s look at the following simple asynchronous function:

async fn write_hello_file_async(name: &str) -> anyhow::Result<()> {
    let mut file = fs::File::create(name).await?;
    file.write_all(b"hello world!").await?;

    Ok(())
}

First, it creates a file, then writes “hello world!” into this file. This function has two awaits; the file is created asynchronously, and the file is written to asynchronously. Ultimately, the entire function returns a Future.

Others can call it like this:

write_hello_file_async("/tmp/hello").await?;

We know that when the executor handles Future, it will continuously call its poll() method. Therefore, the above is roughly equivalent to:

match write_hello_file_async.poll(cx) {
    Poll::Ready(result) => return result,
    Poll::Pending => return Poll::Pending
}

This is how a single await is handled. How about more complex cases, where there are several awaits within a function? Take the internal implementation of write_hello_file_async as an example. Obviously, we should only deal with write_all() after we have handled create(). So, code should look something like this:

let fut = fs::File::create(name);
match fut.poll(cx) {
    Poll::Ready(Ok(file)) => {
        let fut = file.write_all(b"hello world!");
        match fut.poll(cx) {
            Poll::Ready(result) => return result,
            Poll::Pending => return Poll::Pending,
        }
    }
    Poll::Pending => return Poll::Pending,
}

But as said before, the async function returns a Future, so we need to wrap code like this in an implementation of Future and provide it to the outside world. Therefore, we need to create a data structure to save the internal state and implement Future for that data structure, for example:

enum WriteHelloFile {
    // Initial stage, user provides a file name
    Init(String),
    // Waiting for file creation, at this point, Future needs to be saved for multiple calls
    // This is pseudocode, impl Future can't be used here
    AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
    // Waiting for writing to file, at this point, Future needs to be saved for multiple calls
    AwaitingWrite(impl Future<Output = Result<(), std::io::Error>>),
    // Future is complete
    Done,
}

impl WriteHelloFile {
    pub fn new(name: impl Into<String>) -> Self {
        Self::Init(name.into())
    }
}

impl Future for WriteHelloFile {
    type Output = Result<(), std::io::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

fn write_hello_file_async(name: &str) -> WriteHelloFile {
    WriteHelloFile::new(name)
}

Like this, we transformed the asynchronous function write_hello_file_async into a function that returns a WriteHelloFile Future. Let’s see how this Future is implemented (commented in detail):

impl Future for WriteHelloFile {
    type Output = Result<(), std::io::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        loop {
            match this {
                // If the status is Init, generate the create Future and switch state to AwaitingCreate
                WriteHelloFile::Init(name) => {
                    let fut = fs::File::create(name);
                    *self = WriteHelloFile::AwaitingCreate(fut);
                }
                // If the status is AwaitingCreate, then poll the create Future
                // If it returns Poll::Ready(Ok(_)), generate the write Future
                // and switch state to Awaiting
                WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) {
                    Poll::Ready(Ok(file)) => {
                        let fut = file.write_all(b"hello world!");
                        *self = WriteHelloFile::AwaitingWrite(fut);
                    }
                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                    Poll::Pending => return Poll::Pending,
                },
                // If the status is AwaitingWrite, then poll the write Future
                // If it returns Poll::Ready(_), switch state to Done, and the entire Future succeeds
                WriteHelloFile::AwaitingWrite(fut) => match fut.poll(cx) {
                    Poll::Ready(result) => {
                        *self = WriteHelloFile::Done;
                        return Poll::Ready(result);
                    }
                    Poll::Pending => return Poll::Pending,
                },
                // The entire Future has been executed
                WriteHelloFile::Done => return Poll::Ready(Ok(())),
            }
        }
    }
}

The complete internal structure of this Future implementation is actually a state transition machine.

This (pseudo) code and the earlier asynchronous function are equivalent:

async fn write_hello_file_async(name: &str) -> anyhow::Result<()> {
    let mut file = fs::File::create(name).await?;
    file.write_all(b"hello world!").await?;

    Ok(())
}

Rust compiles the async fn or async block to generate a similar state machine implementation. As you can see, what seems like simple asynchronous processing hides a set of state machine management code that, while not hard to understand, is cumbersome and verbose to write.

Getting to understand this issue brings us back to pin. The process of writing state machine code by hand helps you understand why Pin is needed.

Why Do We Need Pin? #

In the implemented Future state machine above, we referenced file, a local variable:

WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) {
    Poll::Ready(Ok(file)) => {
        let fut = file.write_all(b"hello world!");
        *self = WriteHelloFile::AwaitingWrite(fut);
    }
    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
    Poll::Pending => return Poll::Pending,
}

There’s a problem with this code—file is referenced by fut, but file will be discarded in this scope. So, we need to keep it in a data structure:

enum WriteHelloFile {
    // Initial stage, user provides a file name
    Init(String),
    // Waiting for file creation; at this point, Future needs to be saved for multiple calls
    AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
    // Waiting for writing to file; at this point, Future needs to be saved for multiple calls
    AwaitingWrite(AwaitingWriteData),
    // Future is complete
    Done,
}

struct AwaitingWriteData {
    fut: impl Future<Output = Result<(), std::io::Error>>,
    file: fs::File,
}

We can create an AwaitingWriteData structure, put file and fut in it, and then reference it in WriteHelloFile. At this point, within the same structure, fut points to the reference to file, a type of structure called Self-Referential Structure.

Self-referential structures have a big issue: once they are moved, the original pointers will point to the old addresses. - Image

Therefore, some mechanism is needed to ensure this situation will not happen. Pin is a data structure designed for this purpose. We can Pin a pointer pointing to a Future, as seen in Pin’s declaration in the manuscript:

pub struct Pin<P> {
    pointer: P,
}

impl<P: Deref> Deref for Pin<P> {
    type Target = P::Target;
    fn deref(&self) -> &P::Target {
        Pin::get_ref(Pin::as_ref(self))
    }
}

impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
    fn deref_mut(&mut self) -> &mut P::Target {
        Pin::get_mut(Pin::as_mut(self))
    }
}

Pin holds a pointer type P that can be dereferenced to T, not the original type T itself. Therefore, for Pin, you see Pin> and Pin<&mut T>, but not Pin. Because Pin is meant to lock down the memory location of T to avoid the referencing issues caused by self-referential types after moves. - Image

This way, data structures can be accessed normally, but you cannot directly retrieve the original data structure and move it.

Self-Referential Structures #

Of course, self-referential structures do not only appear in asynchronous code, but such structures are easily generated when asynchronous code produces a Future expressed as a state machine internally. Let’s look at an example unrelated to Future (code):

#[derive(Debug)]
struct SelfReference {
    name: String,
    // Points to name after initialization
    name_ptr: *const String,
}

impl SelfReference {
    pub fn new(name: impl Into<String>) -> Self {
        SelfReference {
            name: name.into(),
            name_ptr: std::ptr::null(),
        }
    }

    pub fn init(&mut self) {
        self.name_ptr = &self.name as *const String;
    }

    pub fn print_name(&self) {
        println!(
            "struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}",
            self,
            &self.name,
            self.name_ptr,
            self.name,
            // Using ptr requires unsafe
            // SAFETY: here name_ptr is potentially unsafe and might point to an old location
            unsafe { &*self.name_ptr },
        );
    }
}

fn main() {
    let data = move_creates_issue();
    println!("data: {:?}", data);
    // If the following line is commented out, the program will crash
    // data.print_name();
    print!("\n");
    mem_swap_creates_issue();
}

fn move_creates_issue() -> SelfReference {
    let mut data = SelfReference::new("Tyr");
    data.init();

    // Without move, everything is normal
    data.print_name();

    let data = move_it(data);

    // After the move, name_ref points to an invalidated address
    // Just that the address before the move hasn't been recycled for other uses yet
    data.print_name();
    data
}

fn mem_swap_creates_issue() {
    let mut data1 = SelfReference::new("Tyr");
    data1.init();

    let mut data2 = SelfReference::new("Lindsey");
    data2.init();

    data1.print_name();
    data2.print_name();

    std::mem::swap(&mut data1, &mut data2);
    data1.print_name();
    data2.print_name();
}

fn move_it(data: SelfReference) -> SelfReference {
    data
}

We have created a self-referential structure, SelfReference, where name_ref points to name. When we use it normally, there are no problems. But once we move this structure, name_ref points to the old address of name, creating an issue. See the diagram: - Image

Likewise, if we use std::mem::swap, similar issues will arise. Once swapped, the data content is exchanged, but since name_ptr still points to the old address, the whole pointer system gets confused: - Image

Look at the output of the code to help understand:

struct 0x7ffeea91d6e8: (name: 0x7ffeea91d6e8 name_ptr: 0x7ffeea91d6e8), name: Tyr, name_ref: Tyr
struct 0x7ffeea91d760: (name: 0x7ffeea91d760 name_ptr: 0x7ffeea91d6e8), name: Tyr, name_ref: Tyr
data: SelfReference { name: "Tyr", name_ptr: 0x7ffeea91d6e8 }

struct 0x7ffeea91d6f0: (name: 0x7ffeea91d6f0 name_ptr: 0x7ffeea91d6f0), name: Tyr, name_ref: Tyr
struct 0x7ffeea91d710: (name: 0x7ffeea91d710 name_ptr: 0x7ffeea91d710), name: Lindsey, name_ref: Lindsey
struct 0x7ffeea91d6f0: (name: 0x7ffeea91d6f0 name_ptr: 0x7ffeea91d710), name: Lindsey, name_ref: Tyr
struct 0x7ffeea91d710: (name: 0x7ffeea91d710 name_ptr: 0x7ffeea91d6f0), name: Tyr, name_ref: Lindsey

You can see that after the swap, the content referenced by name_ref was indeed different from name. This is the issue brought by self-referential structures.

You might wonder, didn’t you say move would also cause issues? Why does the second line of output still show name_ref pointing to “Tyr”? It’s because after the move, the previous memory becomes invalid, but the address hasn’t been used for something else yet, so it can still display “Tyr”. But such memory access is unsafe, and if you comment out the following line from main, the program will crash:

fn main() {
    let data = move_creates_issue();
    println!("data: {:?}", data);
    // If the following line is commented out, the program will crash
    // data.print_name();
    print!("\n");
    mem_swap_creates_issue();
}

By now, you should understand the potential dangers of self-referential types in Rust.

Pin plays