22 Stage Practical 2 Constructing a Basic Kv Server Workflow

Phase 22 Practical Operation (2): Building a Simple KV Server - Basic Process #

Hello, I’m Chen Tian.

In the last article, we just got started with our KV store and wrote the basic interfaces. Are you ready to start writing the implementation code? Don’t rush, after defining the interfaces, don’t rush to implement them immediately. Before writing more code, we can experience from a user’s perspective how to use the interface and whether it is easy to use, and then reflect on the design for potential improvements.

Let’s test them one by one in the same order as when we defined the interfaces last time: First, we build the protocol layer.

Implement and Verify the Protocol Layer #

First, create a project: cargo new kv --lib. Enter the project directory and add dependencies in Cargo.toml:

[package]
name = "kv"
version = "0.1.0"
edition = "2018"

[dependencies]
bytes = "1" # Library for efficient processing of network buffers
prost = "0.8" # Code for handling protobuf
tracing = "0.1" # Log processing

[dev-dependencies]
anyhow = "1" # Error handling
async-prost = "0.2.1" # Support wrapping protobuf into TCP frames
futures = "0.3" # Provides Stream trait
tokio = { version = "1", features = ["rt", "rt-multi-thread", "io-util", "macros", "net" ] } # Asynchronous network library
tracing-subscriber = "0.2" # Log processing

[build-dependencies]
prost-build = "0.8" # Compiling protobuf

Then, create abi.proto in the project root directory and put the protobuf codes from the previous article into it. In the root directory, create build.rs:

fn main() {
    let mut config = prost_build::Config::new();
    config.bytes(&["."]);
    config.type_attribute(".", "#[derive(PartialOrd)]");
    config
        .out_dir("src/pb")
        .compile_protos(&["abi.proto"], &["."])
        .unwrap();
}

We’ve seen this code before in [Lecture 5], and build.rs is run at compile time for additional processing.

Here we add some attributes to the generated code. For instance, for protobuf’s bytes type, we generate Bytes instead of the default Vec, and we add the PartialOrd derive macro to all types. For details about the prost-build extension, you can check the documentation.

Remember to create the src/pb directory, or the compilation will fail. Now, doing cargo build in the project root directory will generate the src/pb/abi.rs file containing Rust data structures for all messages defined in protobuf. We create src/pb/mod.rs, bring abi.rs in, and perform some basic type conversions:

pub mod abi;

use abi::{command_request::RequestData, *};

impl CommandRequest {
    /// Create an HSET command
    pub fn new_hset(table: impl Into<String>, key: impl Into<String>, value: Value) -> Self {
        Self {
            request_data: Some(RequestData::Hset(Hset {
                table: table.into(),
                pair: Some(Kvpair::new(key, value)),
            })),
        }
    }
}

impl Kvpair {
    /// Create a new kv pair
    pub fn new(key: impl Into<String>, value: Value) -> Self {
        Self {
            key: key.into(),
            value: Some(value),
        }
    }
}

/// Convert from String to Value
impl From<String> for Value {
    fn from(s: String) -> Self {
        Self {
            value: Some(value::Value::String(s)),
        }
    }
}

/// Convert from &str to Value
impl From<&str> for Value {
    fn from(s: &str) -> Self {
        Self {
            value: Some(value::Value::String(s.into())),
        }
    }
}

Finally, in src/lib.rs, bring in the pb module:

mod pb;
pub use pb::abi::*;

Now we have code that can get our basic protobuf interface for the KV server up and running.

Create an examples directory in the root so we can write some code to test the protocol between the client and the server. Let’s first create an examples/client.rs file and add the following code:

use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv::{CommandRequest, CommandResponse};
use tokio::net::TcpStream;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let addr = "127.0.0.1:9527";
    // Connect to the server
    let stream = TcpStream::connect(addr).await?;

    // Use AsyncProstStream to handle TCP Frames
    let mut client =
        AsyncProstStream::<_, CommandResponse, CommandRequest, _>::from(stream).for_async();

    // Generate an HSET command
    let cmd = CommandRequest::new_hset("table1", "hello", "world".into());

    // Send the HSET command
    client.send(cmd).await?;
    if let Some(Ok(data)) = client.next().await {
        info!("Got response {:?}", data);
    }

    Ok(())
}

This code connects to port 9527 of the server, sends an HSET command, and then waits for the server’s response.

Likewise, we create an examples/dummy_server.rs file and add code:

use anyhow::Result;
use async_prost::AsyncProstStream;
use futures::prelude::*;
use kv::{CommandRequest, CommandResponse};
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();
    let addr = "127.0.0.1:9527";
    let listener = TcpListener::bind(addr).await?;
    info!("Start listening on {}", addr);
    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Client {:?} connected", addr);
        tokio::spawn(async move {
            let mut stream =
                AsyncProstStream::<_, CommandRequest, CommandResponse, _>::from(stream).for_async();
            while let Some(Ok(msg)) = stream.next().await {
                info!("Got a new command: {:?}", msg);
                // Create a 404 response to return to the client
                let mut resp = CommandResponse::default();
                resp.status = 404;
                resp.message = "Not found".to_string();
                stream.send(resp).await.unwrap();
            }
            info!("Client {:?} disconnected", addr);
        });
    }
}

In this code, the server listens on port 9527 and returns a response with status = 404 and message “Not found” to any client request.

If the asynchronous network and processing part is still confusing, don’t worry. Copy the code and run it. Today’s content is not related to the network; you should focus on the processing flow. We will cover network and asynchronous processing in the future.

You can open a command line window and run: RUST_LOG=info cargo run --example dummy_server --quiet. Then, in another command line window, run: RUST_LOG=info cargo run --example client --quiet.

At this point, both the server and client have received each other’s requests and responses, and the protocol layer seems to be working well. Once verified, you can move on to the next step, because the rest of the code for the protocol layer is just a matter of workload, which can be slowly implemented when needed.

Implement and Verify the Storage trait #

Next, let’s build the Storage trait.

In the last lecture, we talked about how to implement the storage trait using a nested, concurrent, in-memory HashMap. Since a concurrent HashMap, like Arc<HashMap<K, V>>, is a necessity, there are many crates in Rust’s ecosystem that support it. Here we can use dashmap to create a MemTable structure to implement the Storage trait.

First, create a src/storage directory, and then create src/storage/mod.rs, placing the previously discussed trait code into it, and introduce the “mod storage” in src/lib.rs. At this point, you’ll find an error: KvError is not defined.

So let’s define KvError. [Lecture 18] briefly demonstrated how to use the derive macro from thiserror to define error types. Let’s use it to define KvError today. Create src/error.rs, and then fill in:

use crate::Value;
use thiserror::Error;

#[derive(Error, Debug, PartialEq)]
pub enum KvError {
    #[error("Not found for table: {0}, key: {1}")]
    NotFound(String, String),

    #[error("Cannot parse command: `{0}`")]
    InvalidCommand(String),
    #[error("Cannot convert value {0} to {1}")]
    ConvertError(Value, &'static str),
    #[error("Cannot process command {0} with table: {1}, key: {2}. Error: {3}")]
    StorageError(&'static str, String, String, String),

    #[error("Failed to encode protobuf message")]
    EncodeError(#[from] prost::EncodeError),
    #[error("Failed to decode protobuf message")]
    DecodeError(#[from] prost::DecodeError),

    #[error("Internal error: {0}")]
    Internal(String),
}

These error definitions are actually added step by step in the implementation process, but for the sake of explanation, they are added all at once here. We’re only interested in StorageError for Storage’s implementation; the other errors will be used in the future.

Likewise, introduce mod error under src/lib.rs, which should now look like this:

mod error;
mod pb;
mod storage;

pub use error::KvError;
pub use pb::abi::*;
pub use storage::*;

src/storage/mod.rs should look like this:

use crate::{KvError, Kvpair, Value};

/// Abstract storage, we don't care where the data is, but we need to define how the outside world interacts with storage
pub trait Storage {
		/// Retrieve a key's value from a HashTable
    fn get(&self, table: &str, key: &str) -> Result<Option<Value>, KvError>;
    /// Set a key's value in a HashTable, returning the old value
    fn set(&self, table: &str, key: String, value: Value) -> Result<Option<Value>, KvError>;
    /// Check if HashTable has a key
    fn contains(&self, table: &str, key: &str) -> Result<bool, KvError>;
    /// Delete a key from HashTable
    fn del(&self, table: &str, key: &str) -> Result<Option<Value>, KvError>;
    /// Iterate over HashTable, returning all kv pairs (this interface is not good)
    fn get_all(&self, table: &str) -> Result<Vec<Kvpair>, KvError>;
    /// Iterate over HashTable, returning an Iterator for kv pairs
    fn get_iter(&self, table: &str) -> Result<Box<dyn Iterator<Item = Kvpair>>, KvError>;
}

The code shouldn’t have any compilation errors at the moment, so you can add test code at the end of this file to try using these interfaces. Of course, we haven’t built MemTable yet, but through the Storage trait we already have a general idea of how MemTable is used, so you can write a test to experience it:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn memtable_basic_interface_should_work() {
        let store = MemTable::new();
        test_basic_interface(store);
    }

    #[test]
    fn memtable_get_all_should_work() {
        let store = MemTable::new();
        test_get_all(store);
    }

    fn test_basic_interface(store: impl Storage) {
        // The first set will create a table, insert the key, and return None (previously had no value)
        let v = store.set("t1", "hello".into(), "world".into());
        assert!(v.unwrap().is_none());
        // Setting the same key again will update and return the previous value
        let v1 = store.set("t1", "hello".into(), "world1".into());
        assert_eq!(v1, Ok(Some("world".into())));

        // Getting an existing key will return the latest value
        let v = store.get("t1", "hello");
        assert_eq!(v, Ok(Some("world1".into())));

        // Getting a non-existent key or table will return None
        assert_eq!(Ok(None), store.get("t1", "hello1"));
        assert!(store.get("t2", "hello1").unwrap().is_none());

        // contains returns true for existing keys, otherwise false
        assert_eq!(store.contains("t1", "hello"), Ok(true));
        assert_eq!(store.contains("t1", "hello1"), Ok(false));
        assert_eq!(store.contains("t2", "hello"), Ok(false));

        // del a key returns the previous value
        let v = store.del("t1", "hello");
        assert_eq!(v, Ok(Some("world1".into())));

        // del a non-existent key or table returns None
        assert_eq!(Ok(None), store.del("t1", "hello1"));
        assert_eq!(Ok(None), store.del("t2", "hello"));
    }

    fn test_get_all(store: impl Storage) {
        store.set("t2", "k1".into(), "v1".into()).unwrap();
        store.set("t2", "k2".into(), "v2".into()).unwrap();
        let mut data = store.get_all("t2").unwrap();
        data.sort_by(|a, b| a.partial_cmp(b).unwrap());
        assert_eq!(
            data,
            vec![
                Kvpair::new("k1", "v1".into()),
                Kvpair::new("k2", "v2".into())
            ]
        )
    }

    fn test_get_iter(store: impl Storage) {
        store.set("t2", "k1".into(), "v1".into()).unwrap();
        store.set("t2", "k2".into(), "v2".into()).unwrap();
        let mut data: Vec<_> = store.get_iter("t2").unwrap().collect();
        data.sort_by(|a, b| a.partial_cmp(b).unwrap());
        assert_eq!(
            data,
            vec![
                Kvpair::new("k1", "v1".into()),
                Kvpair::new("k2", "v2".into())
            ]
        )
    }
}

Writing unit tests before the implementation is a standard Test-Driven Development (TDD) approach. Although I’m not a fanatic supporter of TDD, writing test code after building the trait is a good opportunity to test it because writing tests is a good time to check the ease of use of the interface. After all, we don’t want to find flaws in the trait definition and need to modify after the trait has been implemented, as the cost of changes at this point can be quite substantial.

Therefore, once the trait has been refined, you can start writing test code using the trait. Pay close attention during use. If it’s uncomfortable to write test cases or a lot of tedious operations are needed to use it, then you should reconsider the design of the trait.

If you pay close attention to the unit test code, you will notice that I always adhere to the principle of testing the trait interface. Although in the tests, we need an actual data structure to test the methods of the trait, the core test code is all written with generic functions that relate only to the trait.

This way, we can avoid interference from any particular trait implementation. Additionally, we can share test code in the future when we want to incorporate more trait implementations. For example, if we want to support DiskTable in the future, we can add a few test cases and call the existing generic functions.

Okay, after confirming the trait design is sound and finishing the tests, let’s write the actual implementation. We can create src/storage/memory.rs to build MemTable:

use crate::{KvError, Kvpair, Storage, Value};
use dashmap::{mapref::one::Ref, DashMap};

/// MemTable built using DashMap, implementing the Storage trait
#[derive(Clone, Debug, Default)]
pub struct MemTable {
    tables: DashMap<String, DashMap<String, Value>>,
}

impl MemTable {
    /// Create a default MemTable
    pub fn new() -> Self {
        Self::default()
    }

    /// If a hash table named 'name' does not exist, it will be created, otherwise returned
    fn get_or_create_table(&self, name: &str) -> Ref<String, DashMap<String, Value>> {
        match self.tables.get(name) {
            Some(table) => table,
            None => {
                let entry = self.tables.entry(name.into()).or_default();
                entry.downgrade()
            }
        }
    }
}

impl Storage for MemTable {
    fn get(&self, table: &str, key: &str) -> Result<Option<Value>, KvError> {
        let table = self.get_or_create_table(table);
        Ok(table.get(key).map(|v| v.value().clone()))
    }

    fn set(&self, table: &str, key: String, value: Value) -> Result<Option<Value>, KvError> {
        let table = self.get_or_create_table(table);
        Ok(table.insert(key, value))
    }

    fn contains(&self, table: &str, key: &str) -> Result<bool, KvError> {
        let table = self.get_or_create_table(table);
        Ok(table.contains_key(key))
    }

    fn del(&self, table: &str, key: &str) -> Result<Option<Value>, KvError> {
        let table = self.get_or_create_table(table);
        Ok(table.remove(key).map(|(_k, v)| v))
    }

    fn get_all(&self, table: &str) -> Result<Vec<Kvpair>, KvError> {
        let table = self.get_or_create_table(table