26 Step Practice 3 Build a Simple Kv Server With Advanced Trait Techniques

26 Practice Stage (3): Building a Simple KV Server - Advanced Trait Techniques #

Hello, I’m Chen Tian.

By now, we’ve almost learned the basics of generics, how to use them, and their design philosophies. We also compared them to functions to help you understand that generics are like functions for data structures.

If you find generics challenging, it’s because they operate at a higher level of abstraction, requiring lots of code reading and writing experience. Therefore, if you can understand the code containing generics at this stage, that’s enough. As for using generics, that can only be improved by practicing and summarizing in subsequent exercises. If you really find it hard to grasp, to some extent, what you lack is not the ability to use generics, but the ability to design and architect.

Today, we’ll use the simple KV store version 1.0 we previously built to practice and see how to integrate the knowledge we’ve learned into the code.

In [Lecture 21] and [Lecture 22], we completed the basic functionality of the KV store but left two loose ends:

  1. The get_iter() method of the Storage trait has not been implemented;
  2. There are some TODOs in the execute() method of the Service, which involve event notification handling.

Let’s tackle these one by one. First, look at the get_iter() method.

Handling Iterator #

Before we start writing code, let’s add back the previously commented-out tests in src/storage/mod.rs:

#[test]
fn memtable_iter_should_work() {
    let store = MemTable::new();
    test_get_iter(store);
}

Then try implementing it in src/storge/memory.rs.

impl Storage for MemTable {
    ...
    fn get_iter(&self, table: &str) -> Result<Box<dyn Iterator<Item = Kvpair>>, KvError> {
        // Use clone() to get a snapshot of the table
        let table = self.get_or_create_table(table).clone();
        let iter = table.iter().map(|v| Kvpair::new(v.key(), v.value().clone()));
        Ok(Box::new(iter)) // <-- Compilation error
    }
}

Unfortunately, the compiler tells us Box::new(iter) won’t work, saying “cannot return value referencing local variable table”. This is frustrating, and the reason is that table.iter() uses a reference to table, we return iter, but iter references the table which is a local variable, so it cannot compile successfully.

At this point, we need an iterator that can take full ownership of table. Rust’s standard library provides a trait called IntoIterator, which can transfer ownership of the data structure to an Iterator, see its declaration (source code):

pub trait IntoIterator {
    type Item;
    type IntoIter: Iterator<Item = Self::Item>;

    fn into_iter(self) -> Self::IntoIter;
}

The majority of collection data structures have implemented it. DashMap has also implemented it, so we can use table.into_iter() to transfer ownership of table to iter:

impl Storage for MemTable {
    ...
    fn get_iter(&self, table: &str) -> Result<Box<dyn Iterator<Item = Kvpair>>, KvError> {
        // Use clone() to get a snapshot of the table
        let table = self.get_or_create_table(table).clone();
        let iter = table.into_iter().map(|data| data.into());
        Ok(Box::new(iter))
    }
}

Here we again encounter a data conversion. The values iterated from DashMap (String, Value) need to be converted to Kvpair. We continue to use into() for this. For that purpose, we need to implement the simple From trait for Kvpair:

impl From<(String, Value)> for Kvpair {
    fn from(data: (String, Value)) -> Self {
        Kvpair::new(data.0, data.1)
    }
}

Place these two code snippets in src/storage/memory.rs.

Bingo! This code can compile successfully. Now if you run cargo test to test, the tests for the get_iter() interface will also pass.

Although this code can pass the test and is very concise, we still need to think about how we would handle the get_iter() method if we want to implement the Storage trait for more data stores in the future?

We would:

  1. Get a self-owned Iterator for a certain table
  2. Perform a map on the Iterator
  3. Convert each item mapped to a Kvpair

The second step is the same for every implementation of the Storage trait’s get_iter() method. Is it possible to encapsulate it? So that the implementers of the Storage trait only need to provide their own self-owned Iterator and ensure the Item type inside the Iterator provides Into?

Let’s try it out. In src/storage/mod.rs, build a StorageIter and implement the Iterator trait:

/// Provide a Storage iterator, so that trait implementers only need
/// to offer their iterators to StorageIter, then they just ensure
/// that the next() output type implements Into<Kvpair>.
pub struct StorageIter<T> {
    data: T,
}

impl<T> StorageIter<T> {
    pub fn new(data: T) -> Self {
        Self { data }
    }
}

impl<T> Iterator for StorageIter<T>
where
    T: Iterator,
    T::Item: Into<Kvpair>,
{
    type Item = Kvpair;

    fn next(&mut self) -> Option<Self::Item> {
        self.data.next().map(|v| v.into())
    }
}

This way, in src/storage/memory.rs, the implementation of get_iter() can use StorageIter directly. However, for the value obtained from DashMap’s Iterator each time next() is called (String, Value) needs conversion to Kvpair:

impl Storage for MemTable {
    ...
    fn get_iter(&self, table: &str) -> Result<Box<dyn Iterator<Item = Kvpair>>, KvError> {
        // Use clone() to get a snapshot of the table
        let table = self.get_or_create_table(table).clone();
        let iter = StorageIter::new(table.into_iter()); // This line has changed
        Ok(Box::new(iter))
    }
}

We can test again with cargo test, and it should also pass!

If you reflect on the code we just wrote, you might smile wryly: I painstakingly wrote 20 lines of code and created a new data structure, just to make one line of code in the get_iter() method look prettier. Why the trouble?

Indeed, in the example of this KV server, this level of abstraction does not yield much benefit. However, if the aforementioned steps are not 3 but 5/10, and a large number of steps are the same, that is, we have to write the same logical codes each time we implement a new store, then the abstraction is essential.

Supporting Event Notification #

Now let’s look at event notification. In src/service/mod.rs (the following codes, unless stated otherwise, are in src/service/mod.rs), the current execute() method still has many TODOs to resolve:

pub fn execute(&self, cmd: CommandRequest) -> CommandResponse {
    debug!("Got request: {:?}", cmd);
    // TODO: send the on_received event
    let res = dispatch(cmd, &self.inner.store);
    debug!("Executed response: {:?}", res);
    // TODO: send the on_executed event

    res
}

To solve these TODOs, we need to provide a mechanism for event notification:

  1. When creating the Service, register the corresponding event handling functions;
  2. During the execution of the execute() method, perform the corresponding event notification so that the registered event handlers can be executed.

First, look at how event handling functions are registered.

If we want to register, that means, reciprocally, the Service/ServiceInner data structure needs a place to carry the event registration functions. We can try adding it to the ServiceInner structure:

/// Service internal data structure
pub struct ServiceInner<Store> {
    store: Store,
    on_received: Vec<fn(&CommandRequest)>,
    on_executed: Vec<fn(&CommandResponse)>,
    on_before_send: Vec<fn(&mut CommandResponse)>,
    on_after_send: Vec<fn()>,
}

According to the design in Lecture 21, we provide four events:

  1. on_received: Triggered when the server receives a CommandRequest;
  2. on_executed: Triggered when the server processes CommandRequest and gets CommandResponse;
  3. on_before_send: Triggered before the server sends CommandResponse. Note that this interface provides &mut CommandResponse so that the event handler can modify CommandResponse before sending, if necessary.
  4. on_after_send: Triggered after the server sends CommandResponse.

Before writing the event registration code, let’s first write a test from the user’s perspective to consider how to register:

#[test]
fn event_registration_should_work() {
    fn b(cmd: &CommandRequest) {
        info!("Got {:?}", cmd);
    }
    fn c(res: &CommandResponse) {
        info!("{:?}", res);
    }
    fn d(res: &mut CommandResponse) {
        res.status = StatusCode::CREATED.as_u16() as _;
    }
    fn e() {
        info!("Data is sent");
    }

    let service: Service = ServiceInner::new(MemTable::default())
        .fn_received(|_: &CommandRequest| {})
        .fn_received(b)
        .fn_executed(c)
        .fn_before_send(d)
        .fn_after_send(e)
        .into();

    let res = service.execute(CommandRequest::new_hset("t1", "k1", "v1".into()));
    assert_eq!(res.status, StatusCode::CREATED.as_u16() as _);
    assert_eq!(res.message, "");
    assert_eq!(res.values, vec![Value::default()]);
}

From the test code, we see that we want to keep calling methods like fn_received() on the ServiceInner structure to register corresponding event handling functions. After addition, by using into(), we then convert ServiceInner into Service. This is a classic Builder Pattern, commonly seen in many Rust codes.

So, what’s the magic of methods like fn_received()? Why can they be chained in such a way? The answer is simple: they take ownership of ‘self’, process it, and then return ‘self’ again. So, we continue with the following code:

impl<Store: Storage> ServiceInner<Store> {
    pub fn new(store: Store) -> Self {
        Self {
            store,
            on_received: Vec::new(),
            on_executed: Vec::new(),
            on_before_send: Vec::new(),
            on_after_send: Vec::new(),
        }
    }

    pub fn fn_received(mut self, f: fn(&CommandRequest)) -> Self {
        self.on_received.push(f);
        self
    }

    pub fn fn_executed(mut self, f: fn(&CommandResponse)) -> Self {
        self.on_executed.push(f);
        self
    }

    pub fn fn_before_send(mut self, f: fn(&mut CommandResponse)) -> Self {
        self.on_before_send.push(f);
        self
    }

    pub fn fn_after_send(mut self, f: fn()) -> Self {
        self.on_after_send.push(f);
        self
    }
}

After these arrangements, there is no longer a need to keep the existing new() method for Service, so it can be deleted. At the same time, we need to provide an implementation of From for the Service type:

impl<Store: Storage> From<ServiceInner<Store>> for Service<Store> {
    fn from(inner: ServiceInner<Store>) -> Self {
        Self {
            inner: Arc::new(inner),
        }
    }
}

The few places in the code where Service::new() was used now need to be changed to ServiceInner::new(), such as:

// We need a service structure to at least contain Storage
// let service = Service::new(MemTable::default());
let service: Service = ServiceInner::new(MemTable::default()).into();

After all changes are made, the code can compile successfully.

However, if you run cargo test, the newly added test will fail:

test service::tests::event_registration_should_work ... FAILED

This is because although we have completed the registration of the event handling functions, we have not sent event notifications yet. Also, since our events include immutable events (like on_received) and mutable ones (like on_before_send), event notification needs to distinguish the two. Let’s define two traits: Notify and NotifyMut:

/// Event notification (immutable events)
pub trait Notify<Arg> {
    fn notify(&self, arg: &Arg);
}

/// Event notification (mutable events)
pub trait NotifyMut<Arg> {
    fn notify(&self, arg: &mut Arg);
}

These two traits are generic traits, where the argument Arg corresponds to the arg in the event registration functions, such as:

fn(&CommandRequest);

Thus, we can specifically implement event handling for Vec and Vec, covering the various events supported currently:

impl<Arg> Notify<Arg> for Vec<fn(&Arg)> {
    #[inline]
    fn notify(&self, arg: &Arg) {
        for f in self {
            f(arg)
        }
    }
}

impl<Arg> NotifyMut<Arg> for Vec<fn(&mut Arg)> {
	#[inline]
    fn notify(&self, arg: &mut Arg) {
        for f in self {
            f(arg)
        }
    }
}

With Notify/NotifyMut trait implemented, we can modify the execute() method:

impl<Store: Storage> Service<Store> {
    pub fn execute(&self, cmd: CommandRequest) -> CommandResponse {
        debug!("Got request: {:?}", cmd);
        self.inner.on_received.notify(&cmd);
        let mut res = dispatch(cmd, &self.inner.store);
        debug!("Executed response: {:?}", res);
        self.inner.on_executed.notify(&res);
        self.inner.on_before_send.notify(&mut res);
        if !self.inner.on_before_send.is_empty() {
            debug!("Modified response: {:?}", res);
        }

        res
    }
}

Now the respective event handlers can be notified. This notification mechanism is currently still a synchronous function call; in the future, if needed, we can change it to message passing for asynchronous processing.

Okay, now the tests should work, and running cargo test should pass all tests.

Implementing Storage Trait for Persistent Databases #

So far, our KV store has only been an in-memory KV store. Once the application terminates, all the user-stored keys/values will be lost. We want storage to be persistent.

One solution is to add WAL and disk snapshot support to MemTable, storing all update-relevant commands on disk in order, while regularly performing snapshots for quick data recovery. Another solution is to use existing KV stores like RocksDB or sled.

RocksDB is an embedded KV store developed by Facebook based on Google’s levelDB, written in C++, while sled is a standout KV store that emerged from the Rust community, comparable to RocksDB. Both offer similar features and are comparably easy to demonstrate. sled is simpler and more suitable for today’s content. RocksDB, thoroughly tested in a variety of complex production environments, is more fitting for production use.

So, today we’ll try implementing the Storage trait for sled so that it can adapt to our KV server.

First, include sled in Cargo.toml:

sled = "0.34" # sled db

Then create src/storage/sleddb.rs, and add the following code:

use sled::{Db, IVec};
use std::{convert::TryInto, path::Path, str};

use crate::{KvError, Kvpair, Storage, StorageIter, Value};

#[derive(Debug)]
pub struct SledDb(Db);

impl SledDb {
    pub fn new(path: impl AsRef<Path>) -> Self {
        Self(sled::open(path).unwrap())
    }

    // In sleddb, since it can scan_prefix, we use prefix
    // to simulate a table. Of course, other schemes are also possible.
    fn get_full_key(table: &str, key: &str) -> String {
        format!("{}:{}", table, key)
    }

    // When traversing the keys of a table, we treat prefix: as the table
    fn get_table_prefix(table: &str) -> String {
        format!("{}:", table)
    }
}

/// Flip Option<Result<T, E>> to Result<Option<T>, E>
/// Through this function, you can appreciate the elegance of functional programming
fn flip<T, E>(x: Option<Result<T, E>>) -> Result<Option<T>, E> {
    x.map_or(Ok(None), |v| v.map(Some))
}

impl Storage for SledDb {
    fn get(&self, table: &str, key: &str) -> Result<Option<Value>, KvError> {
        let name = SledDb::get_full_key(table, key);
        let result = self.0.get(name.as_bytes())?.map(|v| v.as_ref().try_into());
        flip(result)
    }

    fn set(&self, table: &str, key: String, value: Value) -> Result<Option<Value>, KvError> {
        let name = SledDb::get_full_key(table, &key);
        let data: Vec<u8> = value.try_into()?;

        let result = self.0.insert(name, data)?.map(|v| v.as_ref().try_into());
        flip(result)
    }

    fn contains(&self, table: &str, key: &str) -> Result<bool, KvError> {
        let name = SledDb::get_full_key(table, &key);

        Ok(self.0.contains_key(name)?)
    }

    fn del(&self, table: &str, key: &str) -> Result<Option<Value>, KvError> {
        let name = SledDb::get_full_key(table, &key);

        let result = self.0.remove(name)?.map(|v| v.as_ref().try_into());
        flip(result)
    }

    fn get_all(&self, table: &str) -> Result<Vec<Kvpair>, KvError> {
        let prefix = SledDb::get_table_prefix(table);
        let