42 Stage Practicum 7 How to Do Major Refactoring to Build a Simple Kv Server

Practical Operation (7): Building a Simple KV Server - How to Undertake Major Refactoring? #

Hello, I’m Chen Tian.

During software development, regardless of how well-designed the initial architecture is, it can still be challenged by unexpected changes in requirements. Therefore, we must make proper architectural designs that can meet potential needs; however, we should avoid overdesigning to cater to elusive requirements. A good developer must be able to strike this balance.

So far, our KV server has come into full bloom and is sufficient as a basic KV storage.

At this point, the product manager suddenly has an idea and wants you to add support for Pub/Sub similar to Redis to this Server. You say: Stop joking, these are essentially two different products. The product manager responds: Redis also supports Pub/Sub. You retort: Then why not just use Redis’s Pub/Sub? The product manager laughs: Right, using Redis is good, and we can even save your salary. Now that it has come to this, you have no choice but to compromise: fine, sister, I’ll do it, isn’t that okay?

Although this is a fictional story, similar major changes in requirements are quite common in our daily work as developers. Let’s take this challenge, which is not small, to see how to undertake major restructuring of a system that has already taken shape.

Analysis of the Existing Architecture #

Let’s briefly recall Redis’ support for Pub/Sub: Clients can initiate SUBSCRIBE, PUBLISH, and UNSUBSCRIBE at any time. If clients A and B SUBSCRIBE to a topic called “lobby”, and client C sends “hello” to the lobby, both A and B will immediately receive this message.

The usage looks like this:

A: SUBSCRIBE "lobby"
A: SUBSCRIBE "King's Glory"
B: SUBSCRIBE "lobby"
C: PUBLISH "lobby" "hello"
// Both A and B receive "hello"
B: UNSUBSCRIBE "lobby"
B: SUBSCRIBE "King's Glory"
D: PUBLISH "lobby" "goodbye"
// Only A receives "goodbye"
C: PUBLISH "King's Glory" "good game"
// Both A and B receive "good game"

With this requirement, let’s review the current architecture:

Image

To support Pub/Sub, there are two major issues with the existing architecture.

First, CommandService is a synchronous processing, for each command, a value is immediately calculated and returned. However, now when a SUBSCRIBE command comes, it expects not one value but several possible future values. We’ve talked about how Stream represents a series of potential future values, so here we need to return an asynchronous Stream.

Therefore, either we need to sacrifice the CommandService trait to adapt to new requirements, or we construct a new trait that is parallel to the CommandService trait to handle commands related to Pub/Sub.

Secondly, if we directly build Pub/Sub support on top of TCP/TLS, we need to establish the concept of “stream” between Request and Response, why?

Previously, our protocol mode was synchronous, back and forth:

Image

However, if we continue to use this method, there will be a head-of-line blocking issue at the application layer. A SUBSCRIBE command, because we don’t know when the result will end, will block all subsequent commands. Therefore, within a connection, we need to divide many independent “streams” so that their sending and receiving do not affect each other:

Image

A typical protocol that uses stream processing is HTTP/2, which uses multiplexing. Therefore, one option is to build the KV server on gRPC using HTTP/2. However, HTTP is an overly complex protocol, with unnecessary overhead for a performance-critical service like a KV server, making it unsuitable.

Another way is to use the Yamux protocol, which we’ve introduced before. It is a simple protocol very similar to the internal multiplexing mechanism of HTTP/2. Using it, the protocol interaction looks like this:

Image

Yamux is suitable for scenarios where multiplexing is desired at the TCP layer without the cumbersome details of HTTP. Today we’ll use it to support the Pub/Sub we intend to implement.

Using yamux for Multiplexing #

In Rust, there is the rust-yamux library to support yamux. Additionally, we’ll need tokio-util, which provides compatibility between tokio’s traits and futures. We introduce them in Cargo.toml:

[dependencies]
...
tokio-util = { version = "0.6", features = ["compat"]} # Compatibility library for tokio and futures
...
yamux = "0.9" # Support for yamux multiplexing
...

Then create src/network/multiplex.rs (remember to include it in mod.rs) and add the following code:

use futures::{future, Future, TryStreamExt};
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode};

/// Yamux control structure
pub struct YamuxCtrl<S> {
    /// yamux control, used to create new streams
    ctrl: Control,
    _conn: PhantomData<S>,
}

impl<S> YamuxCtrl<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    /// Create a yamux client
    pub fn new_client(stream: S, config: Option<Config>) -> Self {
        Self::new(stream, config, true, |_stream| future::ready(Ok(())))
    }

    /// Create a yamux server, where we need to handle the stream
    pub fn new_server<F, Fut>(stream: S, config: Option<Config>, f: F) -> Self
    where
        F: FnMut(yamux::Stream) -> Fut,
        F: Send + 'static,
        Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
    {
        Self::new(stream, config, false, f)
    }

    // Create YamuxCtrl
    fn new<F, Fut>(stream: S, config: Option<Config>, is_client: bool, f: F) -> Self
    where
        F: FnMut(yamux::Stream) -> Fut,
        F: Send + 'static,
        Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
    {
        let mode = if is_client {
            Mode::Client
        } else {
            Mode::Server
        };

        // Create config
        let mut config = config.unwrap_or_default();
        config.set_window_update_mode(WindowUpdateMode::OnRead);

        // Create config, yamux::Stream uses futures' traits so we need to compat() to tokio's traits
        let conn = Connection::new(stream.compat(), config, mode);

        // Create yamux control
        let ctrl = conn.control();

        // Pull data from all streams
        tokio::spawn(yamux::into_stream(conn).try_for_each_concurrent(None, f));

        Self {
            ctrl,
            _conn: PhantomData::default(),
        }
    }

    /// Open a new stream
    pub async fn open_stream(&mut self) -> Result<Compat<yamux::Stream>, ConnectionError> {
        let stream = self.ctrl.open_stream().await?;
        Ok(stream.compat())
    }
}

This code provides the basic handling of Yamux. If there are parts you don’t understand, such as WindowUpdateMode, yamux::into_stream(), etc., it’s perfectly normal, you might need to read through the yamux crate’s documentation and examples.

There’s a complex interface here, let’s explain it a bit:

pub fn new_server<F, Fut>(stream: S, config: Option<Config>, f: F) -> Self
where
    F: FnMut(yamux::Stream) -> Fut,
    F: Send + 'static,
    Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
{
    Self::new(stream, config, false, f)
}

It means that the parameter f is a FnMut closure that accepts a yamux::Stream parameter and returns a Future. We’ve seen such structures before, and the interface is this complex because Rust has not yet stabilized async closures. Therefore, the best way to write async || {} is in this style.

Next, let’s write some tests to ensure it works properly (full code is not presented due to space constraints, you can refer to the GitHub repo and check the diff_yamux to see the modifications):

#[tokio::test]
async fn yamux_ctrl_client_server_should_work() -> Result<()> {
    // Create a TLS-enabled yamux server
    let acceptor = tls_acceptor(false)?;
    let addr = start_yamux_server("127.0.0.1:0", acceptor, MemTable::new()).await?;
    
    let connector = tls_connector(false)?;
    let stream = TcpStream::connect(addr).await?;
    let stream = connector.connect(stream).await?;
    // Create a TLS-enabled yamux client
    let mut ctrl = YamuxCtrl::new_client(stream, None);
    
    // Open a new yamux stream from the client ctrl
    let stream = ctrl.open_stream().await?;
    // Wrap it into ProstClientStream
    let mut client = ProstClientStream::new(stream);
    
    let cmd = CommandRequest::new_hset("t1", "k1", "v1".into());
    client.execute(cmd).await.unwrap();
    
    let cmd = CommandRequest::new_hget("t1", "k1");
    let res = client.execute(cmd).await.unwrap();
    assert_res_ok(res, &["v1".into()], &[]);
    
    Ok(())
}

You can see that after simple encapsulation, yamux naturally integrates into our existing architecture. Since open_stream() returns a stream that complies with tokio’s AsyncRead/AsyncWrite, it can be used directly with ProstClientStream. That is, we’ve altered the network layer a bit, but we didn’t need to change the subsequent logic.

Run cargo test, and all tests should pass.

Supporting pub/sub #

Alright, now that the network layer supports yamux and lays the groundwork for multiplexing, let’s look at the specific implementation of pub/sub.

First, modify abi.proto to add new commands:

// Command request from the client
message CommandRequest {
  oneof request_data {
    ...
    Subscribe subscribe = 10;
    Unsubscribe unsubscribe = 11;
    Publish publish = 12;
  }
}

// Subscribe to a topic, and any data published to this topic will be received
// Upon successful subscription, the first CommandResponse returned will include a unique subscription id
message Subscribe { string topic = 1; }

// Unsubscribe from a topic
message Unsubscribe {
  string topic = 1;
  uint32 id = 2;
}

// Publish data to a topic
message Publish {
  string topic = 1;
  repeated Value data = 2;
}

There’s no need to change the command response. When the client Subscribes, the first value in the stream that is returned includes the subscription ID, which is a globally unique ID. This way, the client can later use Unsubscribe to cancel it.

How to Design Pub/Sub? #

So, how should Pub/Sub be implemented?

We can use two tables: a Topic Table, which stores the topics and their corresponding subscription lists; and a Subscription Table, which stores the subscription ID and the sending side of the channel.

When SUBSCRIBE comes in, we obtain a subscription ID, insert it into the Topic Table, then create an MPSC channel, pushing the sending side of the channel and the subscription ID into the subscription table.

In this way, when someone PUBLISHes, the corresponding subscription ID list can be found from the Topic table, and then the corresponding Senders can be found from the subscription table by looping through it and writing data into it. At this time, the Receiver side of the channel will receive the data, which will be polled by the yamux stream and then sent to the client.

The whole process is illustrated below:

Image

With this basic design in mind, we can start building the interface and data structures:

/// The next subscription id
static NEXT_ID: AtomicU32 = AtomicU32::new(1);

/// Get the next subscription id
fn get_next_subscription_id() -> u32 {
    NEXT_ID.fetch_add(1, Ordering::Relaxed)
}

pub trait Topic: Send + Sync + 'static {
    /// Subscribe to a topic
    fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>>;
    /// Unsubscribe from a topic
    fn unsubscribe(self, name: String, id: u32);
    /// Publish data to a topic
    fn publish(self, name: String, value: Arc<CommandResponse>);
}

/// Structure for publishing and subscribing to topics
#[derive(Default)]
pub struct Broadcaster {
    /// All topics list
    topics: DashMap<String, DashSet<u32>>,
    /// All subscriptions list
    subscriptions: DashMap<u32, mpsc::Sender<Arc<CommandResponse>>>,
}

Here, subscription_id is represented by an AtomicU32.

Many people like to use UUID4 to represent such a globally unique ID. If you use UUID, be sure not to store its string representation, which is a waste of memory and has an extra heap allocation every time. Instead, use its u128 representation.

However, even u128 wastes much more space than u32. Suppose a topic M has 10,000 subscriptions; sending a message to M means copying the entire DashSet, which multiplied by 10,000 means a 40k memory copy for u32, while u128 requires a 160k memory copy. The performance difference is quite obvious.

In addition, we’ve encapsulated CommandResponse in an Arc. If a message needs to be sent to 10,000 clients, we don’t want it to be copied and then sent; instead, the same data should be directly sent.

Here for the Pub/Sub interface, we’ve constructed a Topic trait. Although currently only Broadcaster will implement the Topic trait, maybe in the future, we might switch to different implementations, so abstracting out the Topic trait makes sense.

Implementation of Pub/Sub #

Okay, let’s write some code. Create src/service/topic.rs (remember to include it in mod.rs) and add the following:

use dashmap::{DashMap, DashSet};
use std::sync::{
    atomic::{AtomicU32, Ordering},
    Arc,
};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};

use crate::{CommandResponse, Value};

/// Maximum data to store in a topic
const BROADCAST_CAPACITY: usize = 128;

/// Next subscription id
static NEXT_ID: AtomicU32 = AtomicU32::new(1);

/// Get the next subscription id
fn get_next_subscription_id() -> u32 {
    NEXT_ID.fetch_add(1, Ordering::Relaxed)
}

pub trait Topic: Send + Sync + 'static {
    /// Subscribe to a topic
    fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>>;
    /// Unsubscribe from a topic
    fn unsubscribe(self, name: String, id: u32);
    /// Publish data to a topic
    fn publish(self, name: String, value: Arc<CommandResponse>);
}

/// Structure for topic publishing and subscribing
#[derive(Default)]
pub struct Broadcaster {
    /// List of all topics
    topics: DashMap<String, DashSet<u32>>,
    /// List of all subscriptions
    subscriptions: DashMap<u32, mpsc::Sender<Arc<CommandResponse>>>,
}

impl Topic for Arc<Broadcaster> {
    fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>> {
        let id = {
            let entry = self.topics.entry(name).or_default();
            let id = get_next_subscription_id();
            entry.value().insert(id);
            id
        };

        // Generate an mpsc channel
        let (tx, rx) = mpsc::channel(BROADCAST_CAPACITY);

        let v: Value = (id as i64).into();

        // Immediately send the subscription id to rx
        let tx1 = tx.clone();
        tokio::spawn(async move {
            if let Err(e) = tx1.send(Arc::new(v.into())).await {
                // TODO: This is very unlikely but we currently have no follow-up
                warn!("Failed to send subscription id: {}. Error: {:?}", id, e);
            }
        });

        // Store tx in the subscription table
        self.subscriptions.insert(id, tx);
        debug!("Subscription {} is added", id);

        // Return rx to the network processing context
        rx
    }

    fn unsubscribe(self, name: String, id: u32) {
        if let Some(v) = self.topics.get_mut(&name) {
            // Find the subscription id of the topic in the topics table and delete
            v.remove(&id);

            // If this topic is empty, delete the topic as well
            if v.is_empty() {
                info!("Topic: {:?} is deleted", &name);
                drop(v);
                self.topics.remove(&name);
            }
        }

        debug!("Subscription {} is removed!", id);
        // Similarly, remove it from the subscription table
        self.subscriptions.remove(&id);
    }

    fn publish(self, name: String, value: Arc<CommandResponse>) {
        tokio::spawn(async move {
            match self.topics.get(&name) {
                Some(chan) => {
                    // Copy all subscription ids of the topic
                    // Each id here is u32, so if there are 10k subscriptions under one topic, the cost of copying
                    // is only 40k of heap memory (plus some control structures), so the efficiency is not bad
                    // This is also why we use NEXT_ID to control the generation of subscription ids
                    let chan = chan.value().clone();

                    // Loop through and send
                    for id in chan.into_iter() {
                        if let Some(tx) = self.subscriptions.get(&id) {
                            if let Err(e) = tx.send(value.clone()).await {
                                warn!("Publish to {} failed! error: {:?}", id, e);