Practical Operation (7): Building a Simple KV Server - How to Undertake Major Refactoring? #
Hello, I’m Chen Tian.
During software development, regardless of how well-designed the initial architecture is, it can still be challenged by unexpected changes in requirements. Therefore, we must make proper architectural designs that can meet potential needs; however, we should avoid overdesigning to cater to elusive requirements. A good developer must be able to strike this balance.
So far, our KV server has come into full bloom and is sufficient as a basic KV storage.
At this point, the product manager suddenly has an idea and wants you to add support for Pub/Sub similar to Redis to this Server. You say: Stop joking, these are essentially two different products. The product manager responds: Redis also supports Pub/Sub. You retort: Then why not just use Redis’s Pub/Sub? The product manager laughs: Right, using Redis is good, and we can even save your salary. Now that it has come to this, you have no choice but to compromise: fine, sister, I’ll do it, isn’t that okay?
Although this is a fictional story, similar major changes in requirements are quite common in our daily work as developers. Let’s take this challenge, which is not small, to see how to undertake major restructuring of a system that has already taken shape.
Analysis of the Existing Architecture #
Let’s briefly recall Redis’ support for Pub/Sub: Clients can initiate SUBSCRIBE, PUBLISH, and UNSUBSCRIBE at any time. If clients A and B SUBSCRIBE to a topic called “lobby”, and client C sends “hello” to the lobby, both A and B will immediately receive this message.
The usage looks like this:
A: SUBSCRIBE "lobby"
A: SUBSCRIBE "King's Glory"
B: SUBSCRIBE "lobby"
C: PUBLISH "lobby" "hello"
// Both A and B receive "hello"
B: UNSUBSCRIBE "lobby"
B: SUBSCRIBE "King's Glory"
D: PUBLISH "lobby" "goodbye"
// Only A receives "goodbye"
C: PUBLISH "King's Glory" "good game"
// Both A and B receive "good game"
With this requirement, let’s review the current architecture:
To support Pub/Sub, there are two major issues with the existing architecture.
First, CommandService is a synchronous processing, for each command, a value is immediately calculated and returned. However, now when a SUBSCRIBE command comes, it expects not one value but several possible future values. We’ve talked about how Stream represents a series of potential future values, so here we need to return an asynchronous Stream.
Therefore, either we need to sacrifice the CommandService trait to adapt to new requirements, or we construct a new trait that is parallel to the CommandService trait to handle commands related to Pub/Sub.
Secondly, if we directly build Pub/Sub support on top of TCP/TLS, we need to establish the concept of “stream” between Request and Response, why?
Previously, our protocol mode was synchronous, back and forth:
However, if we continue to use this method, there will be a head-of-line blocking issue at the application layer. A SUBSCRIBE command, because we don’t know when the result will end, will block all subsequent commands. Therefore, within a connection, we need to divide many independent “streams” so that their sending and receiving do not affect each other:
A typical protocol that uses stream processing is HTTP/2, which uses multiplexing. Therefore, one option is to build the KV server on gRPC using HTTP/2. However, HTTP is an overly complex protocol, with unnecessary overhead for a performance-critical service like a KV server, making it unsuitable.
Another way is to use the Yamux protocol, which we’ve introduced before. It is a simple protocol very similar to the internal multiplexing mechanism of HTTP/2. Using it, the protocol interaction looks like this:
Yamux is suitable for scenarios where multiplexing is desired at the TCP layer without the cumbersome details of HTTP. Today we’ll use it to support the Pub/Sub we intend to implement.
Using yamux for Multiplexing #
In Rust, there is the rust-yamux library to support yamux. Additionally, we’ll need tokio-util, which provides compatibility between tokio’s traits and futures. We introduce them in Cargo.toml:
[dependencies]
...
tokio-util = { version = "0.6", features = ["compat"]} # Compatibility library for tokio and futures
...
yamux = "0.9" # Support for yamux multiplexing
...
Then create src/network/multiplex.rs (remember to include it in mod.rs) and add the following code:
use futures::{future, Future, TryStreamExt};
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode};
/// Yamux control structure
pub struct YamuxCtrl<S> {
/// yamux control, used to create new streams
ctrl: Control,
_conn: PhantomData<S>,
}
impl<S> YamuxCtrl<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// Create a yamux client
pub fn new_client(stream: S, config: Option<Config>) -> Self {
Self::new(stream, config, true, |_stream| future::ready(Ok(())))
}
/// Create a yamux server, where we need to handle the stream
pub fn new_server<F, Fut>(stream: S, config: Option<Config>, f: F) -> Self
where
F: FnMut(yamux::Stream) -> Fut,
F: Send + 'static,
Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
{
Self::new(stream, config, false, f)
}
// Create YamuxCtrl
fn new<F, Fut>(stream: S, config: Option<Config>, is_client: bool, f: F) -> Self
where
F: FnMut(yamux::Stream) -> Fut,
F: Send + 'static,
Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
{
let mode = if is_client {
Mode::Client
} else {
Mode::Server
};
// Create config
let mut config = config.unwrap_or_default();
config.set_window_update_mode(WindowUpdateMode::OnRead);
// Create config, yamux::Stream uses futures' traits so we need to compat() to tokio's traits
let conn = Connection::new(stream.compat(), config, mode);
// Create yamux control
let ctrl = conn.control();
// Pull data from all streams
tokio::spawn(yamux::into_stream(conn).try_for_each_concurrent(None, f));
Self {
ctrl,
_conn: PhantomData::default(),
}
}
/// Open a new stream
pub async fn open_stream(&mut self) -> Result<Compat<yamux::Stream>, ConnectionError> {
let stream = self.ctrl.open_stream().await?;
Ok(stream.compat())
}
}
This code provides the basic handling of Yamux. If there are parts you don’t understand, such as WindowUpdateMode, yamux::into_stream(), etc., it’s perfectly normal, you might need to read through the yamux crate’s documentation and examples.
There’s a complex interface here, let’s explain it a bit:
pub fn new_server<F, Fut>(stream: S, config: Option<Config>, f: F) -> Self
where
F: FnMut(yamux::Stream) -> Fut,
F: Send + 'static,
Fut: Future<Output = Result<(), ConnectionError>> + Send + 'static,
{
Self::new(stream, config, false, f)
}
It means that the parameter f
is a FnMut
closure that accepts a yamux::Stream
parameter and returns a Future
. We’ve seen such structures before, and the interface is this complex because Rust has not yet stabilized async closures. Therefore, the best way to write async || {}
is in this style.
Next, let’s write some tests to ensure it works properly (full code is not presented due to space constraints, you can refer to the GitHub repo and check the diff_yamux
to see the modifications):
#[tokio::test]
async fn yamux_ctrl_client_server_should_work() -> Result<()> {
// Create a TLS-enabled yamux server
let acceptor = tls_acceptor(false)?;
let addr = start_yamux_server("127.0.0.1:0", acceptor, MemTable::new()).await?;
let connector = tls_connector(false)?;
let stream = TcpStream::connect(addr).await?;
let stream = connector.connect(stream).await?;
// Create a TLS-enabled yamux client
let mut ctrl = YamuxCtrl::new_client(stream, None);
// Open a new yamux stream from the client ctrl
let stream = ctrl.open_stream().await?;
// Wrap it into ProstClientStream
let mut client = ProstClientStream::new(stream);
let cmd = CommandRequest::new_hset("t1", "k1", "v1".into());
client.execute(cmd).await.unwrap();
let cmd = CommandRequest::new_hget("t1", "k1");
let res = client.execute(cmd).await.unwrap();
assert_res_ok(res, &["v1".into()], &[]);
Ok(())
}
You can see that after simple encapsulation, yamux naturally integrates into our existing architecture. Since open_stream()
returns a stream that complies with tokio’s AsyncRead/AsyncWrite, it can be used directly with ProstClientStream. That is, we’ve altered the network layer a bit, but we didn’t need to change the subsequent logic.
Run cargo test
, and all tests should pass.
Supporting pub/sub #
Alright, now that the network layer supports yamux and lays the groundwork for multiplexing, let’s look at the specific implementation of pub/sub.
First, modify abi.proto
to add new commands:
// Command request from the client
message CommandRequest {
oneof request_data {
...
Subscribe subscribe = 10;
Unsubscribe unsubscribe = 11;
Publish publish = 12;
}
}
// Subscribe to a topic, and any data published to this topic will be received
// Upon successful subscription, the first CommandResponse returned will include a unique subscription id
message Subscribe { string topic = 1; }
// Unsubscribe from a topic
message Unsubscribe {
string topic = 1;
uint32 id = 2;
}
// Publish data to a topic
message Publish {
string topic = 1;
repeated Value data = 2;
}
There’s no need to change the command response. When the client Subscribes, the first value in the stream that is returned includes the subscription ID, which is a globally unique ID. This way, the client can later use Unsubscribe to cancel it.
How to Design Pub/Sub? #
So, how should Pub/Sub be implemented?
We can use two tables: a Topic Table, which stores the topics and their corresponding subscription lists; and a Subscription Table, which stores the subscription ID and the sending side of the channel.
When SUBSCRIBE comes in, we obtain a subscription ID, insert it into the Topic Table, then create an MPSC channel, pushing the sending side of the channel and the subscription ID into the subscription table.
In this way, when someone PUBLISHes, the corresponding subscription ID list can be found from the Topic table, and then the corresponding Senders can be found from the subscription table by looping through it and writing data into it. At this time, the Receiver side of the channel will receive the data, which will be polled by the yamux stream and then sent to the client.
The whole process is illustrated below:
With this basic design in mind, we can start building the interface and data structures:
/// The next subscription id
static NEXT_ID: AtomicU32 = AtomicU32::new(1);
/// Get the next subscription id
fn get_next_subscription_id() -> u32 {
NEXT_ID.fetch_add(1, Ordering::Relaxed)
}
pub trait Topic: Send + Sync + 'static {
/// Subscribe to a topic
fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>>;
/// Unsubscribe from a topic
fn unsubscribe(self, name: String, id: u32);
/// Publish data to a topic
fn publish(self, name: String, value: Arc<CommandResponse>);
}
/// Structure for publishing and subscribing to topics
#[derive(Default)]
pub struct Broadcaster {
/// All topics list
topics: DashMap<String, DashSet<u32>>,
/// All subscriptions list
subscriptions: DashMap<u32, mpsc::Sender<Arc<CommandResponse>>>,
}
Here, subscription_id is represented by an AtomicU32.
Many people like to use UUID4 to represent such a globally unique ID. If you use UUID, be sure not to store its string representation, which is a waste of memory and has an extra heap allocation every time. Instead, use its u128 representation.
However, even u128 wastes much more space than u32. Suppose a topic M has 10,000 subscriptions; sending a message to M means copying the entire DashSet, which multiplied by 10,000 means a 40k memory copy for u32, while u128 requires a 160k memory copy. The performance difference is quite obvious.
In addition, we’ve encapsulated CommandResponse in an Arc. If a message needs to be sent to 10,000 clients, we don’t want it to be copied and then sent; instead, the same data should be directly sent.
Here for the Pub/Sub interface, we’ve constructed a Topic trait. Although currently only Broadcaster will implement the Topic trait, maybe in the future, we might switch to different implementations, so abstracting out the Topic trait makes sense.
Implementation of Pub/Sub #
Okay, let’s write some code. Create src/service/topic.rs
(remember to include it in mod.rs) and add the following:
use dashmap::{DashMap, DashSet};
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use crate::{CommandResponse, Value};
/// Maximum data to store in a topic
const BROADCAST_CAPACITY: usize = 128;
/// Next subscription id
static NEXT_ID: AtomicU32 = AtomicU32::new(1);
/// Get the next subscription id
fn get_next_subscription_id() -> u32 {
NEXT_ID.fetch_add(1, Ordering::Relaxed)
}
pub trait Topic: Send + Sync + 'static {
/// Subscribe to a topic
fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>>;
/// Unsubscribe from a topic
fn unsubscribe(self, name: String, id: u32);
/// Publish data to a topic
fn publish(self, name: String, value: Arc<CommandResponse>);
}
/// Structure for topic publishing and subscribing
#[derive(Default)]
pub struct Broadcaster {
/// List of all topics
topics: DashMap<String, DashSet<u32>>,
/// List of all subscriptions
subscriptions: DashMap<u32, mpsc::Sender<Arc<CommandResponse>>>,
}
impl Topic for Arc<Broadcaster> {
fn subscribe(self, name: String) -> mpsc::Receiver<Arc<CommandResponse>> {
let id = {
let entry = self.topics.entry(name).or_default();
let id = get_next_subscription_id();
entry.value().insert(id);
id
};
// Generate an mpsc channel
let (tx, rx) = mpsc::channel(BROADCAST_CAPACITY);
let v: Value = (id as i64).into();
// Immediately send the subscription id to rx
let tx1 = tx.clone();
tokio::spawn(async move {
if let Err(e) = tx1.send(Arc::new(v.into())).await {
// TODO: This is very unlikely but we currently have no follow-up
warn!("Failed to send subscription id: {}. Error: {:?}", id, e);
}
});
// Store tx in the subscription table
self.subscriptions.insert(id, tx);
debug!("Subscription {} is added", id);
// Return rx to the network processing context
rx
}
fn unsubscribe(self, name: String, id: u32) {
if let Some(v) = self.topics.get_mut(&name) {
// Find the subscription id of the topic in the topics table and delete
v.remove(&id);
// If this topic is empty, delete the topic as well
if v.is_empty() {
info!("Topic: {:?} is deleted", &name);
drop(v);
self.topics.remove(&name);
}
}
debug!("Subscription {} is removed!", id);
// Similarly, remove it from the subscription table
self.subscriptions.remove(&id);
}
fn publish(self, name: String, value: Arc<CommandResponse>) {
tokio::spawn(async move {
match self.topics.get(&name) {
Some(chan) => {
// Copy all subscription ids of the topic
// Each id here is u32, so if there are 10k subscriptions under one topic, the cost of copying
// is only 40k of heap memory (plus some control structures), so the efficiency is not bad
// This is also why we use NEXT_ID to control the generation of subscription ids
let chan = chan.value().clone();
// Loop through and send
for id in chan.into_iter() {
if let Some(tx) = self.subscriptions.get(&id) {
if let Err(e) = tx.send(value.clone()).await {
warn!("Publish to {} failed! error: {:?}", id, e);