21 Stage Practical Operation 1 Building a Simple Kv Server Basic Process

Practical Exercise (1): Building a Simple KV Server - Basic Process #

Hello, I am Chen Tian.

Since lesson seven, we have cleared many hurdles, wrestling with ownership and lifetimes, pulling and pushing with the type system and traits. What for? It is to be able to understand other people’s code and to write increasingly complex and elegant code ourselves.

Today, it is time to test our own strength. After all, talk is cheap. No matter how many knowledge points we master, if we can’t write code ourselves, it’s all for naught. So let’s use the knowledge we’ve learned so far to write a simple KV server together.

Unlike the emotionally charged code experience of getting hands dirty, I will guide you through a step-by-step real polish, in a more detailed fashion. Therefore, the content will be quite extensive. I have divided it into two articles, and I hope you can read through them patiently, carefully experiencing the embodiment of Rust best practices in architectural design and code implementation thinking.

Why choose a KV server for practical exercises? Because it’s a service that is simple enough, yet complex enough. Let’s review its requirements, referencing services used at work like Redis/Memcached.

  • The core function is to perform operations such as data storage, reading, listening, etc., based on different commands;
  • The client should be able to access the KV server via the network, sending requests containing commands and obtaining results;
  • Data should be able to be stored in memory or persisted to disk as needed.

Let’s Start with a Rough Implementation #

If it’s just about completing a task to build a KV server, the initial version could be done in two to three hundred lines of code. However, maintaining such code would be disastrous.

Let’s look at a spaghetti-like version that does without a lot of details. You can follow my comments to focus on the process:

use anyhow::Result;
use async_prost::AsyncProstStream;
use dashmap::DashMap;
use futures::prelude::*;
use kv::{
    command_request::RequestData, CommandRequest, CommandResponse, Hset, KvError, Kvpair, Value,
};
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging
    tracing_subscriber::fmt::init();

    let addr = "127.0.0.1:9527";
    let listener = TcpListener::bind(addr).await?;
    info!("Start listening on {}", addr);

    // Use DashMap to create an in-memory kv store
    let table: Arc<DashMap<String, Value>> = Arc::new(DashMap::new());

    loop {
        // Receive a client's request
        let (stream, addr) = listener.accept().await?;
        info!("Client {:?} connected", addr);

        // Clone db to use it in the tokio task
        let db = table.clone();

        // Create a tokio task to handle the client
        tokio::spawn(async move {
            // Use AsyncProstStream to handle TCP Frame
            // Frame: two-byte frame length, followed by protobuf binary
            let mut stream =
                AsyncProstStream::<_, CommandRequest, CommandResponse, _>::from(stream).for_async();

            // Get the next message from the stream (automatic decoding after retrieval)
            while let Some(Ok(msg)) = stream.next().await {
                info!("Got a new command: {:?}", msg);
                let resp: CommandResponse = match msg.request_data {
                    // For demonstration, we just handle HSET
                    Some(RequestData::Hset(cmd)) => hset(cmd, &db),
                    // Others not handled for now
                    _ => unimplemented!(),
                };

                info!("Got response: {:?}", resp);
                // Send the CommandResponse back to the client
                stream.send(resp).await.unwrap();
            }
        });
    }
}

// Handle the hset command
fn hset(cmd: Hset, db: &DashMap<String, Value>) -> CommandResponse {
    match cmd.pair {
        Some(Kvpair {
            key,
            value: Some(v),
        }) => {
            // Write to db
            let old = db.insert(key, v).unwrap_or_default();
            // Convert value to CommandResponse
            old.into()
        }
        v => KvError::InvalidCommand(format!("hset: {:?}", v)).into(),
    }
}

This code is very straightforward, from input to output, in one fell swoop. If we write in this way, the task can indeed be completed quickly, but it has a sense of “once completed, let the flood come.”

After copying the code, open two windows and run “cargo run –example naive_server” and “cargo run –example client”, respectively, to see the following output in the server window:

Sep 19 22:25:34.016  INFO naive_server: Start listening on 127.0.0.1:9527
Sep 19 22:25:38.401  INFO naive_server: Client 127.0.0.1:51650 connected
Sep 19 22:25:38.401  INFO naive_server: Got a new command: CommandRequest { request_data: Some(Hset(Hset { table: "table1", pair: Some(Kvpair { key: "hello", value: Some(Value { value: Some(String("world")) }) }) })) }
Sep 19 22:25:38.401  INFO naive_server: Got response: CommandResponse { status: 200, message: "", values: [Value { value: None }], pairs: [] }

Although the overall functionality is achieved, if you want to add new features to this KV server later, you’ll need to rewrite this section of the code over and over again.

Moreover, it is not conducive to unit testing, because all the logic is compacted together, with no “unit” to speak of. Although in the future, different logic can be separated into different functions to simplify the main process, they are still intertwined. Without major refactoring, the fundamental problems cannot be solved.

Therefore, regardless of the language used for development, we must rigorously avoid such code. Not only should we not write this way ourselves, but if we encounter others writing code like this during a code review, we must rigorously point it out.

Architecture and Design #

Then, what counts as a good implementation?

Good implementation should be, after a thorough analysis of the requirements, to first start from the system’s main process, to clarify the major steps from the client’s request to the final response received by the client; then according to these steps, think about what needs to be loosely coupled, and build the main interfaces and traits; after deep reflection on these matters, finally consider the implementation. That’s what we call “plan before action.”

The requirements for a KV server have been analyzed at the beginning. Now let’s sort out the main process. You can think about it first, and then refer to the diagram to see if there are any omissions:

There are some key issues in this process that need further exploration:

  1. What protocol do clients and servers use to communicate? TCP? gRPC? HTTP? Support one or multiple?
  2. How to define the application layer protocol between clients and servers? How to serialize/deserialize? Use Protobuf, JSON, or Redis RESP? Or could multiple be supported?
  3. What commands does the server support? Which are prioritized in the first version?
  4. In the specific processing logic, do we need to add hooks to publish some events during the process, allowing other processes to receive notifications and perform additional processing? Can these hooks preemptively terminate the entire process?
  5. For storage, should we support different storage engines? For instance, MemDB (memory-based), RocksDB (disk-based), SledDB (disk-based), etc. For MemDB, should we consider adding WAL (Write-Ahead Log) and snapshot support?
  6. Can the entire system be configured? For example, which port to use, which storage engine?

If you want to do good architecture, it’s vital to ask these questions and find the answers. It’s worth noting that many of these questions cannot be answered by product managers, or their responses might lead you astray. As architects, we need to be responsible for how the system will respond to changes in the future.

Here are my thoughts, which you can refer to:

  1. For a high-performance scenario like a KV server, communication should prioritize the TCP protocol. So for now, we only support TCP, but based on future needs, we could support more protocols, like HTTP2/gRPC. Also, future security requirements could mean TLS protocols need to be easily integrated. In short, the network layer needs to be flexible.

  2. For the application layer protocol, we can use protobuf definition. Protobuf directly resolves the protocol definition and how to serialize and deserialize. Redis’s RESP is certainly good, but its shortcomings are evident. Extra parsing is needed for commands, and a lot of \r\n to separate commands or data is somewhat wasteful. Using JSON is even more wasteful of bandwidth and has lower parsing efficiency, especially with large data volumes.

Protobuf is very suitable for a scenario like a KV server, with flexibility, backward-compatible upgrades, high parsing efficiency, and very bandwidth-efficient binaries. Its only drawback is requiring extra tools like protoc to compile into different languages. Although protobuf is preferred, to be compatible with Redis clients in the future, RESP support might still be necessary.

  1. The commands supported by the server can reference Redis’s command set. The first version could prioritize support for HXXX commands, such as HSET, HMSET, HGET, HMGET, etc. From command to command response, a trait can be used for abstraction.

  2. In the process, we plan to add these hooks: OnRequestReceived after receiving a client command, OnRequestExecuted after processing a client command, BeforeResponseSend before sending a response, and AfterResponseSend after sending a response. This way, the main steps of the processing are exposed as events, making our KV server very flexible and facilitating additional processing logic during service initialization.

  3. Storage must be flexible enough. We can make a trait to abstract its basic behavior — start by just implementing MemDB, and we will definitely need a persistent storage in the future.

  4. Configuration support is needed, but not as a high priority. Once the basic process is sorted out and enough pain points are discovered during usage, we can consider how to handle configuration files.

Once all these issues are settled, the system’s basic approach is clear. We can first define several important interfaces and then carefully examine them.

The most important interfaces are the interfaces for three main interactions: the protocol between the client and the server, the interface for the server and command processing, and the interface for the server and storage.

Protocols between Clients and Servers #

First is the protocol between the client and the server. Let’s try defining the client commands supported in our first version using protobuf:

syntax = "proto3";

package abi;

// Command request from client
message CommandRequest {
  oneof request_data {
    Hget hget = 1;
    Hgetall hgetall = 2;
    Hmget hmget = 3;
    Hset hset = 4;
    Hmset hmset = 5;
    Hdel hdel = 6;
    Hmdel hmdel = 7;
    Hexist hexist = 8;
    Hmexist hmexist = 9;
  }
}

// Server response
message CommandResponse {
  // Status code; reusing HTTP 2xx/4xx/5xx status codes
  uint32 status = 1;
  // If not 2xx, the message contains detailed information
  string message = 2;
  // Successful return of values
  repeated Value values = 3;
  // Successful return of kv pairs
  repeated Kvpair pairs = 4;
}

// Get a key’s value from a table
message Hget {
  string table = 1;
  string key = 2;
}

// Get all Kvpair from a table
message Hgetall { string table = 1; }

// Get a group of keys’ values from a table
message Hmget {
  string table = 1;
  repeated string keys = 2;
}

// Returned value
message Value {
  oneof value {
    string string = 1;
    bytes binary = 2;
    int64 integer = 3;
    double float = 4;
    bool bool = 5;
  }
}

// Returned kvpair
message Kvpair {
  string key = 1;
  Value value = 2;
}

// Store a kvpair into a table,
// creating the table if it doesn’t exist
message Hset {
  string table = 1;
  Kvpair pair = 2;
}

// Store a group of kvpairs into a table,
// creating the table if it doesn’t exist
message Hmset {
  string table = 1;
  repeated Kvpair pairs = 2;
}

// Delete a key from a table, returning its previous value
message Hdel {
  string table = 1;
  string key = 2;
}

// Delete a group of keys from a table, returning their previous values
message Hmdel {
  string table = 1;
  repeated string keys = 2;
}

// Check if a key exists
message Hexist {
  string table = 1;
  string key = 2;
}

// Check if a group of keys exists
message Hmexist {
  string table = 1;
  repeated string keys = 2;
}

Through prost, this protobuf file can be compiled into Rust code (mainly structs and enums) for our use. You may remember, as mentioned in [Lesson 5], seeing prost’s way of handling protobuf when talking about the development of thumbor.

CommandService Trait #

After the client-server protocol is determined, we should consider how to process requests and return responses.

We plan to support 9 commands now and may support more in the future. So it is best to define a trait to uniformly process all commands and return the results. When processing a command, it needs to interact with storage so that data can be read based on the parameters carried in the request or the data from the request can be stored in the storage system. So, the trait can be defined like this:

/// Abstraction for Command handling
pub trait CommandService {
    /// Process Command and return Response
    fn execute(self, store: &impl Storage) -> CommandResponse;
}

With this trait, and each command implementing this trait, the dispatch method could look something like this:

// Get Response from Request, currently dealing with HGET/HGETALL/HSET
pub fn dispatch(cmd: CommandRequest, store: &impl Storage) -> CommandResponse {
    match cmd.request_data {
        Some(RequestData::Hget(param)) => param.execute(store),
        Some(RequestData::Hgetall(param)) => param.execute(store),
        Some(RequestData::Hset(param)) => param.execute(store),
        None => KvError::InvalidCommand("Request has no data".into()).into(),
        _ => KvError::Internal("Not implemented".into()).into(),
    }
}

In this way, when we support new commands in the future, we only need to do two things: implement CommandService for the command and add support for the new command in the dispatch method.

Storage Trait #

Let’s consider the Storage trait which is designed for different storage. It provides the main interfaces of the KV store:

/// Storage abstraction, we don’t care where data resides, but we need to define how the outside world interacts with storage
pub trait Storage {
    /// Get a key’s value from a HashTable
    fn get(&self, table: &str, key: &str) -> Result<Option<Value>, KvError>;
    /// Set a key’s value in a HashTable, returning the old value
    fn set(&self, table: &str, key: String, value: Value) -> Result<Option<Value>, KvError>;
    /// Check if a HashTable contains a key
    fn contains(&self, table: &str, key: &str) -> Result<bool, KvError>;
    /// Delete a key from a HashTable
    fn del(&self, table: &str, key: &str) -> Result<Option<Value>, KvError>;
    /// Iterate over a HashTable, returning all kv pairs (this interface is not good)
    fn get_all(&self, table: &str) -> Result<Vec<Kvpair>, KvError>;
    /// Get an iterator returning kv pairs from a HashTable
    fn get_iter(&self, table: &str) -> Result<Box<dyn Iterator<Item = Kvpair>>, KvError>;
}

As seen in the CommandService trait, when handling client requests, the interface being interacted with is the Storage trait, not a specific store. The advantage of this approach is that different stores can be added in the future according to business needs by implementing the Storage trait without changing any CommandService related code.

For example, in the HGET command implementation, we use the Storage::get method to get data from the table, which is irrelevant to any particular storage solution:

impl CommandService for Hget {
    fn execute(self, store: &impl Storage) -> CommandResponse {
        match store.get(&self.table, &self.key) {
            Ok(Some(v)) => v.into(),
            Ok(None) => KvError::NotFound(self.table, self.key).into(),
            Err(e) => e.into(),
        }
    }
}

Most of the Storage trait methods should likely be straightforward for you to define, but the get_iter() interface might be confusing to you, as it returns a Box. Why is that?

This is previously discussed as a trait object (Lesson 13).

Here we want to return an iterator that the caller does not care what specific type it is; it only needs to be able to call next() to get the next value. Different implementations might return different iterators, and to use one interface to encompass them, we need to use a trait object. When using a trait object, because Iterator is a trait with an associated type, we need to specify what the associated type Item is so that the caller knows how to process this type.

You might be wondering, given that set/del are clearly methods that would cause self to be modified, why are their interfaces still using &self?

Let’s ponder their usage. The simplest implementation of the Storage