40 Asynchronous Processing Handling Asynchronous Io

40 Asynchronous Processing: How to Handle Asynchronous IO? #

Hello, I’m Chen Tian.

In the previous two lectures, we learned the basic features and principles of asynchronous processing (Future/async/await), but we haven’t officially introduced which tools should be used to handle asynchronous IO in specific situations. However, when we previously talked about traits, we have already understood and used some structures and traits for synchronous IO.

Today we will compare synchronous IO to learn asynchronous IO. After all, when learning some new knowledge, linking it with knowledge already in your mind can activate the connections between neurons in the brain and double the learning effect.

Remember what structures and traits are available in a synchronous environment? First, a single value can be expressed with type T, and a group of values can be expressed with the Iterator trait; for synchronous IO, we have standard Read/Write/Seek traits. As the name implies, Read/Write is for IO’s reading and writing, and Seek is to move the current position back and forth in IO.

So what about asynchronous? We’ve learned that for a single value that will be obtained at some future moment, we can use a Future to represent it:

Image

But we still don’t know what trait to use to describe a group of values to be obtained in the future, nor do we know what asynchronous Read/Write should look like. Today, let’s talk about these important asynchronous data types.

Stream trait #

First, let’s understand the cousin of Iterator in an asynchronous environment: Stream.

We know that for Iterator, you can keep calling its next() method to get new values until Iterator returns None. Iterator returns data in a blocking manner. Each time next() is called, it must occupy the CPU until a result is obtained, ** while asynchronous Stream is non-blocking and frees up the CPU for other tasks during the wait**.

However, unlike Future which has stabilized in the standard library, the Stream trait is currently only available on the nightly version. Typically, when dealing with Stream, we would use the futures library. Let’s compare the source code definitions of Iterator and Stream:

pub trait Iterator {
    type Item;
    fn next(&mut self) -> Option<Self::Item>;

    // ... (size_hint and other methods)
    // ... (67 other methods in total)
}

pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>,  cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

    // ... (size_hint)
}

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin { ... }
    // ... (map and other convenience methods)
    // ... (41 other methods in total)
}

As seen, Iterator places all methods inside the Iterator trait, while Stream differentiates the base methods that developers need to implement from the derived methods with default implementations and puts them in different traits. Let’s take the method map as an example.

When implementing Stream, you need to provide an Item type, which is the type of value each time you get one out; in addition, there’s the poll_next() method, which looks similar to the poll() method of Future and serves a similar role to next() for the Iterator version.

However, poll_next() is not convenient to call; we need to handle the Poll status ourselves. Therefore, StreamExt provides a next() method that returns a Next structure implementing the Future trait, so we can directly obtain the next value through stream.next().await. Let’s see the implementation of the next() method and the Next structure (source code):

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// Next returns the Next structure
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// If Stream is Unpin then Next is also Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}

// Next is implemented by Future, each `poll()` is actually from stream `poll_next()`
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

Let’s look at a small example (code):

use futures::prelude::*;

#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);

    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}

We use stream::iter to generate a Stream, and perform filter/map operations on it. Finally, we traverse the entire stream and print out the obtained data. From the use experience, Stream and Iterator are very similar and can be used in contrast.

Creating Streams #

The futures library provides some basic methods for generating Streams, in addition to the iter method used above, there are:

  • empty(): Generates an empty Stream
  • once(): Generates a Stream containing only a single value
  • pending(): Generates a Stream that does not contain any values and only returns Poll::Pending
  • repeat(): Generates a Stream that keeps returning the same value
  • repeat_with(): Generates a Stream that indefinitely returns data through a closure function
  • poll_fn(): Generates a Stream through a closure that returns Poll<Option>
  • unfold(): Generates a Stream through an initial value and a closure that returns a Future

The first few methods for generating a Stream are easy to understand; the last few methods introduce closures and are a bit more complex. Let’s use them individually to implement the Fibonacci sequence and compare the differences (code):

use futures::{prelude::*, stream::poll_fn};
use std::task::Poll;

#[tokio::main]
async fn main() {
    consume(fib().take(10)).await;
    consume(fib1(10)).await;
    // The Unfold stream generated by unfold does not implement Unpin,
    // so we Pin<Box<T>> it so it conforms to the interface of consume
    consume(fib2(10).boxed()).await;
}

async fn consume(mut st: impl Stream<Item = i32> + Unpin) {
    while let Some(v) = st.next().await {
        print!("{} ", v);
    }
    print!("\\n");
}

// Using repeat_with to create a stream, but you can't control when it ends
fn fib() -> impl Stream<Item = i32> {
    let mut a = 1;
    let mut b = 1;
    stream::repeat_with(move || {
        let c = a + b;
        a = b;
        b = c;
        b
    })
}

// Using poll_fn to create a stream, you can end it by returning Poll::Ready(None)
fn fib1(mut n: usize) -> impl Stream<Item = i32> {
    let mut a = 1;
    let mut b = 1;
    poll_fn(move |_cx| -> Poll<Option<i32>> {
        if n == 0 {
            return Poll::Ready(None);
        }
        n -= 1;
        let c = a + b;
        a = b;
        b = c;
        Poll::Ready(Some(b))
    })
}

fn fib2(n: usize) -> impl Stream<Item = i32> {
    stream::unfold((n, (1, 1)), |(mut n, (a, b))| async move {
        if n == 0 {
            None
        } else {
            n -= 1;
            let c = a + b;
            // c as the return value of poll_next(), (n, (a, b)) as state
            Some((c, (n, (b, c))))
        }
    })
}

Notably, when using unfold we’re dealing with both local variables and Future, so the generated Stream does not implement Unpin. How to deal with it?

Pin<Box<T>> is a simple method that can pin data to the heap, and we can use the StreamExt boxed() method to generate a Pin<Box<_>>.

Besides the above methods, we can also implement the Stream trait for a data structure, making it support Stream. Here’s an example (code):

use futures::prelude::*;
use pin_project::pin_project;
use std::{
    pin::Pin,
    task::{Context, Poll},
};
use tokio::{
    fs,
    io::{AsyncBufReadExt, AsyncRead, BufReader, Lines},
};

/// LineStream internally uses tokio::io::Lines
#[pin_project]
struct LineStream<R> {
    #[pin]
    lines: Lines<BufReader<R>>,
}

impl<R: AsyncRead> LineStream<R> {
    /// Create a LineStream from BufReader
    pub fn new(reader: BufReader<R>) -> Self {
        Self {
            lines: reader.lines(),
        }
    }
}

/// Implement Stream trait for LineStream
impl<R: AsyncRead> Stream for LineStream<R> {
    type Item = std::io::Result<String>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project()
            .lines
            .poll_next_line(cx)
            .map(Result::transpose)
    }
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let file = fs::File::open("Cargo.toml").await?;
    let reader = BufReader::new(file);
    let mut st = LineStream::new(reader);
    while let Some(Ok(line)) = st.next().await {
        println!("Got: {}", line);
    }

    Ok(())
}

This code encapsulates the Lines structure. We can convert a reader that implements the AsyncBufRead trait into Lines through AsyncBufReadExt method lines().

You might have noticed pin_project introduced into the code, which provides convenient macros to help us deal with fields in data structures that need to be pinned. In the data structure, use #[pin] to declare that a field needs to be encapsulated as Pin when in use. This way, when calling, we can get a Pin<&mut Lines> using self.project().lines, which is useful for calling the method poll_next_line() (which takes Pin<&mut Self> as its first argument).

Inside the Lines structure, the asynchronous method next_line() reads the next line and basically wraps the lower-level poll_next_line() interface.

Although the Lines structure provides next_line(), it has not implemented Stream, so we cannot uniformly use the next() method to obtain the next line like other Streams. Therefore, we wrap it in our LineStream and implement the Stream method for LineStream.

Note that since poll_next_line()’s result is Result<Option<Item>>, and Stream’s poll_next() result is Option<Result<Item>>, we need to use the Result method’s transpose to swap the two. This transpose method is a very basic and practical method.

Asynchronous IO Interfaces #

When we implemented LineStream, we encountered two asynchronous I/O interfaces: AsyncRead and AsyncBufRead. Returning to the chart at the beginning, I believe you already have a rough answer now: All synchronous Read/Write/Seek traits, with an Async added in front, form the corresponding asynchronous IO interfaces.

However, unlike Stream, if you compare the IO traits defined under futures with those defined under tokio, you will notice that they each have their individual definitions, and neither has unified, showing slight differences:

Image

For example, the definition of AsyncRead under futures is:

pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        buf: &mut [u8]
    ) -> Poll<Result<usize, Error>>;

    unsafe fn initializer(&self) -> Initializer { ... }
    fn poll_read_vectored(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        bufs: &mut [IoSliceMut<'_>]
    ) -> Poll<Result<usize, Error>> { ... }
}

While the definition of AsyncRead under tokio is:

pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        buf: &mut ReadBuf<'_>
    ) -> Poll<Result<()>>;
}

Here is the key difference: tokio’s poll_read() method requires ReadBuf, while futures’s poll_read() method requires &mut [u8]. In addition, futures’s AsyncRead has two additional default methods.

Now let’s look at AsyncWrite. The AsyncWrite interface under futures is:

pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        buf: &[u8]
    ) -> Poll<Result<usize, Error>>;
    fn poll_flush(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;
    fn poll_close(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;

    fn poll_write_vectored(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        bufs: &[IoSlice<'_>]
    ) -> Poll<Result<usize, Error>> { ... }
}

While under tokio, the definition of AsyncWrite is:

pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        buf: &[u8]
    ) -> Poll<Result<usize, Error>>;
    fn poll_flush(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;
    fn poll_shutdown(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;

    fn poll_write_vectored(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        bufs: &[IoSlice<'_>]
    ) -> Poll<Result<usize, Error>> { ... }
    fn is_write_vectored(&self) -> bool { ... }
}

You’ll note that the only real difference for AsyncWrite between the two is the name: poll_close() in futures and poll_shutdown() in tokio. I won’t go through all the other asynchronous IO interfaces one by one; you can compare the code yourself.

Compatibility Handling of Asynchronous IO Interfaces #

Why would there be such a split in Rust’s asynchronous IO traits? This is because in the early implementation stages of tokio/futures libraries, the community had not formed a more unified set of asynchronous IO traits, and with different considerations behind the different interfaces, this division has been perpetuated.

So, if you’re using tokio for asynchronous development, code needs to use the asynchronous IO traits under tokio::io. Maybe in the future, once the Async IO traits are stable and incorporated into the standard library, tokio will update their traits.

Even though there’s this split in Rust’s asynchronous IO traits, you don’t need to worry too much. tokio-util provides the respective Compat feature that allows your data structure to seamlessly switch between the two. Here’s an example using yamux for multiplexing, with detailed comments at key locations:

use anyhow::Result;
use futures::prelude::*;
use tokio::net::TcpListener;
use tokio_util::{
    codec::{Framed, LinesCodec},
    compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt},
};
use tracing::info;
use yam