34 Concurrency Handling Tools Provided by Rust From Atomics to Channels

34 Concurrent Processing - Part 2: From atomics to Channel, What Tools Does Rust Provide? #

Hello, I’m Chen Tian.

For these three common working patterns under concurrent conditions: free competition mode, map/reduce mode, and DAG mode, our challenge is how to synchronize among these concurrent tasks. atomics/Mutex can solve synchronization issues of concurrent tasks in free competition mode and also manages well in map/reduce mode, because synchronization only happens between the map and reduce stages. -

However, they don’t address a higher-level issue, that is, DAG mode: How should one proceed if access needs to occur in a certain order or if there are dependencies?

A typical scenario for this problem is the producer-consumer pattern: after the producer creates content, there needs to be a mechanism to notify the consumer that it’s available for consumption. For example, when there is data on a socket, notify the handling thread to process the data, after which, notify the socket’s send/receive thread to transfer data.

Condvar #

Therefore, operating systems also provide Condvar (Condition Variable). Condvar has two states:

  • Wait: The thread waits in the queue until a condition is met.
  • Notify: When the condvar’s condition is satisfied, the current thread notifies other waiting threads that they can be awakened. Notification can be for a single thread or multiple threads, or even broadcast (notify everyone).

In practice, Condvar is often used with a Mutex: Mutex is used to ensure mutual exclusion when reading and writing conditions, while Condvar controls the waiting and waking of threads. Let’s look at an example:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        eprintln!("I'm a happy worker!");
        // Notify the main thread
        cvar.notify_one();
        loop {
            thread::sleep(Duration::from_secs(1));
            println!("working...");
        }
    });

    // Wait for the worker thread's notification
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }
    eprintln!("Worker started!");
}

With this condvar code, we have implemented a mechanism where the worker thread notifies the main thread at a certain stage of execution, and then the main thread does some tasks.

Here, we used a Mutex as a condition mutex, then in cvar.wait(), we pass this Mutex. This interface requires a MutexGuard so that it knows which Mutex’s waiting thread needs to be awakened:

pub fn wait<'a, T>(
    &self,
    guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>

Channel #

However, using Mutex and Condvar to handle complex DAG concurrent patterns can be quite burdensome. So, Rust also provides various kinds of Channels for communication between concurrent tasks.

Thanks to the vigorous promotion by the Go language, Channels may be the most well-known means of concurrency. Compared to Mutex, a Channel is a higher level of abstraction with the most intuitive interface and less mental burden. When using Mutex, you need to carefully avoid deadlocks, control the size of critical sections, and prevent all possible mishaps.

Although in Rust, we can have “Fearless concurrency”— once the code compiles, most concurrency issues can be avoided, still, performance and logical deadlocks require developer’s attention.

The Channel encapsulates the locks within small regions of queue writing and reading, then separates the reader and writer completely. So, as developers, except for the potential context switching, writing and reading data becomes as irrelevant to locks as accessing a local queue. Therefore, for most concurrency issues, Channels or similar concepts (like the actor model) can be applied.

Depending on the specific application of a Channel, different tools will be chosen. Rust provides the following four kinds of Channels:

  • oneshot: This is probably the simplest form of Channel, with the writer sending data only once and the reader also reading only once. This kind of one-time, multi-thread synchronization can be accomplished with a oneshot channel. Because of the special purpose of the oneshot, the implementation can use an atomic swap directly.
  • rendezvous: Often, we just need to use a Channel to control synchronization between threads without the need to send data. A rendezvous channel is a special case of channel size being zero.

In such cases, using Mutex + Condvar is enough, and in practice, a rendezvous channel is just a wrapper for Mutex + Condvar.

  • bounded: A bounded channel has a queue, but there is an upper limit to the queue. Once the queue is full, writers must suspend and wait. When blocking occurs, once a reader reads data, the channel’s internal Condvar notify_one will notify a writer to wake them up to continue writing.

Therefore, in implementation, they generally use Mutex + Condvar + VecDeque; if not using Condvar, one can directly use thread::park + thread::notify to finish (the approach of flume); if not using VecDeque, one can also use a double-ended list or other ring buffer implementations.

  • unbounded: The queue has no upper limit, and if filled, it expands automatically. We know that many of Rust’s data structures such as Vec, VecDeque auto-expand. Compared to bounded, other than not blocking the writer, the rest of the implementation is quite similar.

All of these channel types fundamentally agree in implementation concept, whether sync or async — the main difference being the entity that is suspended/awaked. In the synchronous world, the entity is a thread; while in the asynchronous world, it’s a small-grained task. -

Based on the number of Channel readers and writers, Channels can be divided into:

  • SPSC: Single-Producer Single-Consumer. The simplest form, it can be implemented without Mutex, using only atomics.
  • SPMC: Single-Producer Multi-Consumer. Requires a lock when consumers read.
  • MPSC: Multi-Producer Single-Consumer. Requires a lock when producers write.
  • MPMC: Multi-Producer Multi-Consumer. Requires a lock when producers write or consumers read.

Among the many types of Channels, the most widely used is the MPSC channel, multi-producer, single-consumer, because often we want the single consumer to ensure exclusive write access to the data structure that handles messages. -

For example, in the implementation of xunmi, the index writer is a multi-threaded implementation, but when used, we need its writable reference.

If we want to use the index writer in various contexts, we have to wrap it with Arc<Mutex<_», but this approach is too inefficient when indexing a lot of data, so we can use an MPSC channel to send data from various contexts to a single thread, using the index writer. This way, we avoid locking:

pub struct IndexInner {
    index: Index,
    reader: IndexReader,
    config: IndexConfig,
    updater: Sender<Input>,
}

pub struct IndexUpdater {
    sender: Sender<Input>,
    t2s: bool,
    schema: Schema,
}

impl Indexer {
    // Open or create an index
    pub fn open_or_create(config: IndexConfig) -> Result<Self> {
        let schema = config.schema.clone();
        let index = if let Some(dir) = &config.path {
            fs::create_dir_all(dir)?;
            let dir = MmapDirectory::open(dir)?;
            Index::open_or_create(dir, schema.clone())?
        } else {
            Index::create_in_ram(schema.clone())
        };

        Self::set_tokenizer(&index, &config);

        let mut writer = index.writer(config.writer_memory)?;
    
        // Create an unbounded MPSC channel
        let (s, r) = unbounded::<Input>();
    
        // Start a thread to read data from the channel's reader
        thread::spawn(move || {
            for input in r {
                // Then process this input using the index writer
                if let Err(e) = input.process(&mut writer, &schema) {
                    warn!("Failed to process input. Error: {:?}", e);
                }
            }
        });
    
        // Store the channel's sender part in the IndexInner structure
        Self::new(index, config, s)
    }

    pub fn get_updater(&self) -> IndexUpdater {
        let t2s = TextLanguage::Chinese(true) == self.config.text_lang;
        // The IndexUpdater internally contains the sender part of the channel
        // Since it's an MPSC channel, here we can simply clone the sender
        // This also means that we can create any number of IndexUpdaters to send data in different contexts
        // And the data will ultimately be given to the thread created above, processed by the index writer
        IndexUpdater::new(self.updater.clone(), self.index.schema(), t2s)
    }
}

Actor #

Lastly, we’ll briefly introduce the actor model, which is primarily used of by the Erlang VM and akka.

An actor is a type of coroutine with a stack. Each actor has its own independent, lightweight call stack and a message queue (mailbox or message queue) for receiving messages. The only way the outside world can interact with an actor is by sending it messages.

Rust’s standard library does not include an implementation of the actor model, but the community has mature tools like actix (the famous actix-web is based on actix), and bastion.

The following code uses actix to implement a simple DummyActor, which can receive an InMsg and return an OutMsg:

use actix::prelude::*;
use anyhow::Result;

// Messages that actor can handle
#[derive(Message, Debug, Clone, PartialEq)]
#[rtype(result = "OutMsg")]
enum InMsg {
    Add((usize, usize)),
    Concat((String, String)),
}

#[derive(MessageResponse, Debug, Clone, PartialEq)]
enum OutMsg {
    Num(usize),
    Str(String),
}

// Actor
struct DummyActor;

impl Actor for DummyActor {
    type Context = Context<Self>;
}

// Implementing the Handler trait for InMsg processing
impl Handler<InMsg> for DummyActor {
    type Result = OutMsg; // <- The message to return

    fn handle(&mut self, msg: InMsg, _ctx: &mut Self::Context) -> Self::Result {
        match msg {
            InMsg::Add((a, b)) => OutMsg::Num(a + b),
            InMsg::Concat((mut s1, s2)) => {
                s1.push_str(&s2);
                OutMsg::Str(s1)
            }
        }
    }
}

#[actix::main]
async fn main() -> Result<()> {
    let addr = DummyActor.start();
    let res = addr.send(InMsg::Add((21, 21))).await?;
    let res1 = addr
        .send(InMsg::Concat(("hello, ".into(), "world".into())))
        .await?;

    println!("res: {:?}, res1: {:?}", res, res1);

    Ok(())
}

As can be seen, for DummyActor, we only need to implement the Actor trait and Handler trait.

A Brief Summary #

After completing these two lessons, let’s summarize the use scenarios for various concurrency primitives: Atomic, Mutex, RwLock, Semaphore, Condvar, Channel, Actor.

  • Atomic is very useful for handling simple primitive types. If you can synchronize with AtomicXXX structures, then they are the best choice.
  • When your data structures cannot be simply synchronized by AtomicXXX, and yet you genuinely need to share data across multiple threads, Mutex/RwLock can be a choice. However, you need to consider the granularity of the lock. Locks with too wide granularity are inefficient.
  • If you have N resources that can be used by multiple concurrent tasks, Semaphore is a good choice. For example, if you want to create a DB connection pool.
  • When you need to notify and cooperate in concurrent tasks, Condvar provides the most basic notification mechanism, and Channel broadly extends this mechanism so you can use Condvar for point-to-point synchronization and Channel for one-to-many, many-to-one, many-to-many synchronization.

So, when designing most complex systems, Channel is often the most powerful weapon, as it not only lets data travel between various threads and asynchronous tasks, its interface can also elegantly adapt to streams.

If the focus when designing the entire backend system architecture is on: What services exist, how services communicate, how data flows, and how services synchronize with each other; then when designing the architecture of a particular service, the focus is on what functional threads (asynchronous tasks) exist, what their interfaces look like, how data flows, and how to synchronize.

Here, Channel incorporates three functions of interface, synchronization, and data flow, hence why I say it is the strongest weapon.

However, it shouldn’t be the only weapon. The real-world concurrency problems we face are varied, and so should the solutions. Over the past decades, computer scientists have incessantly explored and built a series of concurrency primitives, also revealing that there’s hardly a silver bullet to solve all problems.

Even Mutex itself will make different compromises depending on the scene (such as fairness compromise), because the world is such that you can’t have the best of both worlds; there’s no perfect solution, only compromised solutions. So, Channels are not a silver bullet, the actor model is not a silver bullet, locks are not a silver bullet.

A good programming language can provide best practices in most scenarios (like Erlang/Go), but it should not foster an atmosphere where only a particular best practice is the sole solution. I enjoy Erlang’s actor model and Go’s Channels, but unfortunately, they rely too heavily on a specific, sole concurrency solution, causing developers to see every problem as a nail for their hammer.

On the contrary, Rust provides almost all the solutions you need and doesn’t advertise their merits or flaws, leaving it up to you to choose as needed. When I write multi-threaded applications in Rust, Channel still is the first choice, but I will still use Mutex, RwLock, Semaphore, Condvar, Atomic, and other tools where appropriate rather than clumsily stacking Channels upon Channels to face every scenario.

Thinking Question #

  1. Please carefully read the standard library documentation for std::sync, std::sync::atomic, and std::sync::mpsc. Try using mpsc::channel to send messages back and forth between two threads. For example, thread A sends to thread B: “hello world!”, and thread B replies with “goodbye!” after receiving it.
  2. Think about it, if you had to implement the actor model using existing concurrency primitives, how would you do it?

Feel free to share your thoughts in the comments section, and thank you for reading. You’ve already completed the 34th check-in for learning Rust. If you feel like you’ve gained something, feel free to share it with friends and invite them to join the discussion. See you in the next lesson.