41 Stage Practical Exercise 6 Build a Simple Kv Server With Asynchronous Processing

Practical Hands-on (6): Building a Simple KV Server - Asynchronous Processing #

Hello, I am Chen Tian.

Until now, we have completed building a relatively complete KV server together. Do you remember how we built this service step by step?

After finishing the basics, we set up the core functionality of the KV server ([Lecture 21], [Lecture 22]), constructed the protobuf for client-server interactions, designed the CommandService trait and Storage trait to handle client commands and storage respectively.

After mastering the practical use of traits in the advanced section ([Lecture 26]), we further constructed the Service data structure, which receives CommandRequest, calls the corresponding CommandService based on its type for processing along with the appropriate event notification, and finally returns CommandResponse.

But all of this happened in a synchronous world: No matter how the data is obtained, the data is already there, and all we need to do is to perform operations that convert one data type into another.

Then we ventured into the world of networking ([Lecture 36]). We constructed our own frame for the KV server: a 4-byte header containing information about length and whether the data is compressed, along with the actual payload; we also designed a FrameCoder for packaging and unpacking frames, which laid a solid foundation for building a network interface. Considering network security ([Lecture 37]), we provided TLS support.

When building ProstStream, we started dealing with asynchronous operations: the stream inside ProstStream needs to support AsyncRead + AsyncWrite, making ProstStream adaptable to all asynchronous network interfaces that implement AsyncRead and AsyncWrite, including TcpStream and TlsStream.

Up to this point, we have cleared the process from receiving a command from a remote end, through TCP and TLS, then the FrameCoder decodes a CommandRequest, and it is processed by the Service. The structure that connects the synchronous world with the asynchronous world is the ProstServerStream.

The complete process flow and system structure from package reception to post-processing package delivery can be seen in the following diagram:

Image

What are we doing today? #

Although we have written quite a lot of asynchronous or asynchronous-related code quite early on, we have not yet written much of the essential asynchronous processing functions like poll(), poll_read(), poll_next(). In the previous lecture, we created a DummyStream for testing the asynchronous read_frame(), which was a taste of the complex interfaces of the lower-level asynchronous processing functions. However, in DummyStream, we didn’t do any complex operations:

struct DummyStream {
    buf: BytesMut,
}

impl AsyncRead for DummyStream {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        // See how much data ReadBuf needs
        let len = buf.capacity();
        // Split the data of this size
        let data = self.get_mut().buf.split_to(len);
        // Copy to ReadBuf
        buf.put_slice(&data);
        // Directly finished
        std::task::Poll::Ready(Ok(()))
    }
}

In the last lecture, we learned about asynchronous IO, and in this class, we put it to practical use, refactoring the existing code to make the core ProstStream more in line with Rust’s asynchronous IO interface logic. What exactly should we do?

Take a look at the existing ProstServerStream’s process() function and compare it with the calling logic of async_prost library’s AsyncProst:

// Internal logic of the process() function
while let Ok(cmd) = self.recv().await {
    info!("Got a new command: {:?}", cmd);
    let res = self.service.execute(cmd);
    self.send(res).await?;
}

// Calling logic of async_prost library's AsyncProst
while let Some(Ok(cmd)) = stream.next().await {
    info!("Got a new command: {:?}", cmd);
    let res = svc.execute(cmd);
    stream.send(res).await.unwrap();
}

As you can see, because AsyncProst implements the Stream and Sink traits, it can more naturally call the methods of the StreamExt trait’s next() and SinkExt trait’s send() to handle data transmission and reception, whereas ProstServerStream implements additional functions recv() and send() on its own.

Although from the perspective of code comparison, these two sections of code are almost the same, AsyncProst still has the upper hand in terms of future scalability and harmony with the entire asynchronous ecosystem.

So today we will construct a ProstStream structure, have it implement the Stream and Sink traits, and then have ProstServerStream and ProstClientStream use it.

Creating ProstStream #

Before starting the refactoring, let’s briefly review the Stream trait and the Sink trait:

// Can be compared to Iterator
pub trait Stream {
    // The data type read from the Stream
    type Item;

    // Read the next data from the stream
    fn poll_next(
        self: Pin<&mut Self>, cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}

// 
pub trait Sink<Item> {
    type Error;
    fn poll_ready(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
    fn poll_flush(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
    fn poll_close(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
}

So what specific types should ProstStream contain?

Since its main responsibility is to read from or send data to the underlying stream, a generic parameter S that supports AsyncRead and AsyncWrite is a necessity.

Additionally, both the Stream trait and the Sink trait require an Item type. For our system, Item is either CommandRequest or CommandResponse, but for flexibility, we can use two generic parameters In and Out to represent them.

Of course, when dealing with Stream and Sink, we also need read buffers and write buffers.

Summing up, our ProstStream structure would look like this:

pub struct ProstStream<S, In, Out> {
    // inner stream
    stream: S,
    // write buffer
    wbuf: BytesMut,
    // read buffer
    rbuf: BytesMut,
}

However, Rust does not allow data structures to have more generic parameters than necessary. What to do? Don’t worry; we can use PhantomData. As discussed before, it is a zero-byte placeholder that allows our data structure to carry unused generic parameters.

Alright, now that we have a clear idea, let’s create src/network/stream.rs and add the following code (remember to add a reference to stream.rs in src/network/mod.rs):

// [Include respective imports and trait implementations for ProstStream here.]

// The given code includes setting up the `ProstStream` structure and implementing the `Stream` and `Sink` traits with the `todo!()` macro placeholder for future detailed implementation.

This code snippet includes the skeletal code for implementing the Stream and Sink for ProstStream. Next, we’ll go through them one by one. Note that the In and Out parameters are also constrained to the FrameCoder, so that within the implementation, we can use decode_frame() and encode_frame() to obtain an Item or encode an Item.

Implementing Stream #

Let’s first implement the Stream’s poll_next() method.

poll_next() can directly use our previously written read_frame(), and then decode_frame() for unpacking:

// [Insert implementation of poll_next() using read_frame() and decode_frame() here.]

This is easy to understand, but this middle part needs a bit of explanation:

    // Using read_frame to get the data
    let fut = read_frame(&mut self.stream, &mut rest);
    ready!(Box::pin(fut).poll_unpin(cx))?;

Since poll_xxx() methods are already the lower-level API implementations of async/await, we cannot directly use asynchronous functions within poll_xxx() methods. We need to treat it as a future and then call the future’s poll function. Since future is a trait, we need to Box it into a trait object on the heap, enabling the call to FutureExt’s poll_unpin() method. Box::pin generates a Pin.

As for the ready! macro, it will directly return Pending on Pending, and on Ready, it returns the value of Ready:

// [Explanation of the ready! macro and its use]

We have now implemented the Stream. Isn’t it pretty straightforward?

Implementing Sink #

Next, we implement Sink, which seems to have several methods to implement, but it’s not that complicated either. Let’s review the four methods poll_ready, start_send(), poll_flush, and poll_close.

poll_ready() is for backpressure, and you can decide whether or not to return Poll::Ready based on the load. For our network layer, we do not need to worry about backpressure right away and can rely on the operating system’s TCP protocol stack to provide backpressure handling, so here we directly return Poll::Ready(Ok(())), meaning that when the upper layer wants to write data, it can be written at any time.

// [Insert poll_ready implementation here, which directly returns Poll::Ready(Ok(())).]

When poll_ready() returns Ready, Sink moves to start_send(). In start_send(), we will just prepare the necessary data. Here we package the item into a byte stream and store it in wbuf:

// [Insert start_send implementation here, encoding the item into the wbuf.]

Then in poll_flush(), we begin to write data. We need to record where we are currently writing, so we need to add a field called written to ProstStream to record how many bytes have been written:

// [Insert the ProstStream structure with the added 'written' field.]

With the written field, you can loop and write:

// [Insert the loop logic for writing into stream here, using poll_write.]

Finally, poll_close() only needs to call the stream’s flush and shutdown methods to ensure the data is written and the stream is closed:

// [Insert poll_close implementation here, utilizing stream's poll_flush and poll_shutdown methods.]

Creating ProstStream #

Now that our ProstStream has implemented Stream and Sink, let’s build some auxiliary methods to facilitate its use, such as new():

// [Insert new() implementation for ProstStream here.]

Additionally, we have also implemented the Unpin trait for it, which will make it much easier for others to use your code. Generally speaking, data structures created for asynchronous operations should implement Unpin if they use generic parameters and there are no internal references.

Testing! #

We’ve reached the critical point of testing. We need to write some tests to ensure that ProstStream works correctly. Since we wrote a DummyStream in src/network/frame.rs that implemented AsyncRead, we only need to extend it to implement AsyncWrite as well.

To make it reusable, we move it out of frame.rs and put it in src/network/mod.rs, and modify it to look like the following (remember to use the new DummyStream in the tests in frame.rs):

// [Add the adapted DummyStream to `src/network/mod.rs` with AsyncRead and AsyncWrite implementations.]

Now we can write a test under src/network/stream.rs:

// [Insert test code that ensures that `ProstStream` works as expected with DummyStream here.]

Run cargo test, and all tests should pass! (If you have a compilation error, it may be due to missing references; you can adjust on your own or check the complete code on GitHub).

Using ProstStream #

Next, we can have ProstServerStream and ProstClientStream use the newly defined ProstStream. You can refer to the comparison below to see the differences between the two:

// Old interface
// pub struct ProstServerStream<S> {
//     inner: S,
//     service: Service,
// }

// New implementation using ProstStream
pub struct ProstServerStream<S> {
    inner: ProstStream<S, CommandRequest, CommandResponse>,
    service: Service,
}

// Old interface
// pub struct ProstClientStream<S> {
//     inner: S,
// }

// New implementation using ProstStream
pub struct ProstClientStream<S> {
    inner: ProstStream<S, CommandResponse, CommandRequest>,
}

Then remove the send()/recv() functions and modify the process()/execute() functions to use the next() method and send() method. The main changes are as follows:

// [Insert changes that leverage next() and send() on ProstStream within ProstServerStream and ProstClientStream here.]

Run cargo test again, and all tests should pass. Similarly, if you encounter any compilation errors, you may be missing references.

We can also open a command-line window and run: RUST_LOG=info cargo run --bin kvs --quiet. Then, in another command-line window, run: RUST_LOG=info cargo run --bin kvc --quiet. At this point, both the server and client will have received and processed each other’s requests and responses normally!

We have refactored the code of ProstServerStream and ProstClientStream, using the new ProstStream that’s more in line with the usage of Stream/Sink traits from the futures library. The overall code has changed substantially, but the internal implementation changes don’t affect other parts of the system! This is fantastic!

Summary #

In actual development, refactoring to improve the quality of existing code is indispensable. During the development of KV server, we continuously executed some small refactorings.

Today we performed a slightly larger refactoring, providing more in line with the asynchronous IO interface for the existing code. From the perspective of external usage, it doesn’t provide or meet any additional requirements, but from the standpoint of code structure and quality, it lets our ProstStream be called more conveniently and more intuitively by other interfaces and makes it easier to integrate with the existing Rust ecosystem.

You might be curious why it is so natural to refactor the code? It’s because we have sufficient unit test coverage as a foundation.

Just like biological evolution, good code evolves positively through benign refactoring. And benign refactoring is the step taken in the right direction under the supervision of excellent unit tests. Here, unit tests play the role of the natural environment in biological evolution, eliminating errors during the refactoring process one by one.

Thought Questions #

  1. Why do we place wbuf/rbuf and written fields in the ProstStream data structure when creating it? Why can’t we use local variables?
  2. Read the documentation for Stream and Sink closely. Try to write code to construct simple data structures that implement Stream and Sink.

Feel free to share your thoughts and learning achievements in the comments section, and thank you for listening. You have completed the 41st check-in for Rust learning; see you in the next session.