33 Concurrent Processing From Atomics to Channel What Tools Rust Provides

33 Concurrent Processing (Part 1): From Atomics to Channels, What Tools Does Rust Provide? #

Hello, I am Chen Tian.

Unnoticed, we have already battled side by side through more than thirty lectures, and I hope that through this period of learning, you’d feel something like “I have become a better programmer!” This is what I wanted to convey to you through introducing Rust’s philosophy, approaches to problem-solving, concepts in designing interfaces, and so on. Now, we have finally reached the long-anticipated chapter on concurrency and async programming.

Many people can’t distinguish between concurrency and parallelism. Rob Pike, one of the founders of Golang, has a very incisive and intuitive explanation:

Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.

Concurrency is the ability to handle many things simultaneously, while parallelism is the means of performing many things at the same time.

We handle tasks in multiple threads or asynchronous operations concurrently. Executing these threads or asynchronous tasks simultaneously on multi-core and multi-CPU machines is parallelism. It can be said that concurrency empowers parallelism. Once we possess the ability to concurrent computing, parallelism becomes a matter of course.

Actually, we have already touched on many aspects of concurrency before. For example, using std::thread to create threads, dealing with synchronization in concurrency using concurrency primitives (like Mutex) from std::sync, and ensuring safety in concurrency with the Send/Sync trait, and so on.

In the process of dealing with concurrency, the difficulty lies not in how to create multiple threads to allocate work, but in how to synchronize tasks in concurrent conditions. Let’s look at a few common work patterns in concurrent states: free competition mode, map/reduce mode, DAG mode: -

In free competition mode, multiple concurrent tasks will compete for access to the same critical section. The timing and manner of how tasks access the critical section are uncertain, or the most flexible, and it’s sufficient to have exclusive access before entering the critical section.

On top of the free competition, we can restrict synchronization modes in concurrency, typically including map/reduce and DAG modes. In the map/reduce mode, tasks are dispersed, processed identically, and then organized in a certain sequence after completion; In the DAG mode, tasks are sliced into non-intersecting, dependent sub-tasks, which are then executed concurrently according to dependency relations.

Combining these three basic modes, we can handle highly complex concurrent scenarios. Therefore, when we deal with complex problems, we should first clarify the context, use the idea of divide and conquer to break down the problem into orthogonal sub-problems, and then combine appropriate concurrency modes to address these sub-problems.

What concurrency primitives are behind these concurrency modes, and what are available to us? These two lectures will focus on explaining and delving into five concepts, including Atomic, Mutex, Condvar, Channel, and the Actor model. Today, we will talk about the first two: Atomic and Mutex.

Atomic #

Atomic is the foundation of all concurrency primitives and lays a solid base for synchronizing concurrent tasks.

Speaking of synchronization, you probably first think of locks, so before we get into atomics, let’s start with how a basic lock is implemented. In free competition mode, we need to use a mutex to protect a critical section to ensure that the task entering it has exclusive access.

For simplicity, when acquiring this lock, if it’s not obtainable, we loop endlessly until we get the lock (code):

use std::{cell::RefCell, fmt, sync::Arc, thread};

struct Lock<T> {
    locked: RefCell<bool>,
    data: RefCell<T>,
}

impl<T> fmt::Debug for Lock<T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Lock<{:?}>", self.data.borrow())
    }
}

// SAFETY: We are confident that Lock<T> is safe and can be shared across multiple threads
unsafe impl<T> Sync for Lock<T> {}

impl<T> Lock<T> {
    pub fn new(data: T) -> Self {
        Self {
            data: RefCell::new(data),
            locked: RefCell::new(false),
        }
    }

    pub fn lock(&self, op: impl FnOnce(&mut T)) {
        // Keep spinning if the lock was not obtained
        while *self.locked.borrow() != false {} // **1

        // Got it, lock immediately
        *self.locked.borrow_mut() = true; // **2

        // Start working
        op(&mut self.data.borrow_mut()); // **3

        // Unlock
        *self.locked.borrow_mut() = false; // **4
    }
}

fn main() {
    let data = Arc::new(Lock::new(0));

    let data1 = data.clone();
    let t1 = thread::spawn(move || {
        data1.lock(|v| *v += 10);
    });

    let data2 = data.clone();
    let t2 = thread::spawn(move || {
        data2.lock(|v| *v *= 10);
    });
    t1.join().unwrap();
    t2.join().unwrap();

    println!("data: {:?}", data);
}

This code simulates the implementation of a Mutex, with the lock() method being the core part.

Previously mentioned, after calling lock() on a Mutex, you get a MutexGuard as an RAII structure. Here, for simplicity’s sake, we asked for the caller to pass a closure to handle transactions after locking. In the lock() method, a concurrent task that didn’t get the lock will continuously spin; tasks that get the lock can work, and after completing the work, they unlock, enabling previously spinning tasks to compete for the lock and enter the critical section.

This implementation seems to have no big problems, but if you think it through, it has several issues:

  1. In multi-core situations, between **1 and **2, other threads may also happen to stop spinning and modify locked to true. Thus, multiple threads could get the lock, violating the guarantee of exclusive access for any one thread.
  2. Even in a single-core situation, due to preemptive scheduling by the operating system, between **1 and **2, problem 1 could occur.
  3. Nowadays, compilers optimize generated instructions to the maximum. If there are no dependencies between operations, they might generate out-of-order machine code, such as moving **3 before **1, thereby breaking the lock’s guarantee.
  4. Even if the compiler does not perform out-of-order processing, the CPU will reorder instructions to maximize pipeline efficiency. This could also lead to problem 3.

Therefore, the behavior of our lock implementation is undefined. It may work as we wish most of the time, but randomly exhibit strange behavior. When such a thing occurs, bugs could appear in every corner of the system in various forms. Moreover, such bugs are almost impossible to solve because they are challenging to reproduce consistently, behave inconsistally, and possibly only occur on certain CPUs.

It’s worth re-emphasizing here that unsafe code needs to be rigorous and should be reviewed by highly experienced engineers. The reason why the concurrency safety of this code snippet was violated is that we mistakenly thought that implementing Sync for Lock was safe.

To solve the problems in the above code, we must provide some guarantees at the CPU level, making certain operations atomic.

The most basic guarantee is: It is possible to read a memory address with a single instruction, judge if the value equals a predetermined one, and if so, modify it to a new value. This is the Compare-and-swap operation, abbreviated as CAS. It is the cornerstone of nearly all concurrency primitives provided by the operating system, enabling the implementation of a lock that works properly.

Therefore, we can change the initial loop in the code to:

while self
    .locked
    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
    .is_err() 
{}

This statement means: if the current value of locked is false, change it to true. This entire operation is completed in a single instruction and cannot be interrupted or modified by other threads; if the current value of locked is not false, then an error is returned and we will keep spinning until the precondition is met. Here, compare_exchange is Rust’s CAS operation, which will be compiled to the corresponding CPU CAS instruction.

When this statement succeeds, locked will be changed to true, and we will have successfully obtained the lock, while all other threads will spin on this line.

Similarly, to release the lock, we need to use an atomic version, rather than directly assigning false:

self.locked.store(false, Ordering::Release);

Of course, to accommodate such changes, we need to change locked from bool to AtomicBool. In Rust, std::sync::atomic has many atomic data structures, corresponding to various basic structures. Let’s see the new implementation using AtomicBool (code):

use std::{
    cell::RefCell,
    fmt,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread,
};

struct Lock<T> {
    locked: AtomicBool,
    data: RefCell<T>,
}

impl<T> fmt::Debug for Lock<T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Lock<{:?}>", self.data.borrow())
    }
}

// SAFETY: We are confident that Lock<T> is safe and can be shared across multiple threads
unsafe impl<T> Sync for Lock<T> {}

impl<T> Lock<T> {
    pub fn new(data: T) -> Self {
        Self {
            data: RefCell::new(data),
            locked: AtomicBool::new(false),
        }
    }

    pub fn lock(&self, op: impl FnOnce(&mut T)) {
        // Keep spinning if the lock was not obtained
        while self
            .locked
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_err()
        {} // **1

        // We’ve now acquired the lock; let's get to work
        op(&mut self.data.borrow_mut()); // **3

        // Unlock
        self.locked.store(false, Ordering::Release);
    }
}

fn main() {
    let data = Arc::new(Lock::new(0));

    let data1 = data.clone();
    let t1 = thread::spawn(move || {
        data1.lock(|v| *v += 10);
    });

    let data2 = data.clone();
    let t2 = thread::spawn(move || {
        data2.lock(|v| *v *= 10);
    });
    t1.join().unwrap();
    t2.join().unwrap();

    println!("data: {:?}", data);
}

As you can see, by using compare_exchange, we avoid the problems faced by 1 and 2; however, for items 3 and 4 related to automatic optimization by compilers/CPUs, we still need additional measures. This is the purpose behind the extra two strange parameters related to Ordering in the function.

If you look at the atomic documentation, you will find Ordering is an enum:

pub enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst,
}

The documentation explains the purposes of the various Orderings. Let me expand a little bit on this.

The first one, Relaxed, is the most permissive rule, imposing no limits on the compiler or CPU, allowing them to execute out-of-order.

Release, when we write data (like the store in the code above), if Release order is used, then:

  • For the current thread, no read or write operation may be reordered after this store. That is, in the example above, the CPU or compiler cannot reorder **3 to execute after **4.
  • For other threads, if Acquire is used to read this atomic data, they will see the updated result. We used Acquire in the compare_exchange above, ensuring that the latest value is read.

And Acquire, when we read data, if Acquire order is used then:

  • For the current thread, no read or write operation can be reordered before this read. In the example above, CPU or compiler cannot reorder **3 to execute before **1.
  • For other threads, if Release is used to modify data, the updated value will be visible to the current thread.

The fourth, AcqRel, is a combination of Acquire and Release, simultaneously possessing the guarantees of both Acquire and Release. This is generally used on fetch_xxx operations, for instance, if you wanted to increment an atomic by 1, and you hope that operations before and after this action are not out-of-order, and the result of the operation is visible to other threads.

The last option, SeqCst, is the strictest ordering, in addition to the guarantees of AcqRel, it also ensures that all threads see a consistent order of all SeqCst operations.

Since CAS and ordering are system-level operations, what is described here regarding Ordering’s purposes are more or less similar across various languages. In the case of Rust, its atomic primitive is derived from C++. If you find Rust’s documentation confusing, C++’s documentation about ordering is much clearer.

In fact, the spinning process while acquiring the lock above is not very efficient; a better way is to handle it like this:

while self
    .locked
    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
    .is_err()
{
    // Performance optimization: compare_exchange requires exclusive access. When we cannot get the lock,
    // we continuously check the state of locked until it's unlocked and then try to get the lock.
    while self.locked.load(Ordering::Relaxed) == true {}
}

Notice that inside the while loop, another loop is nested. This is because CAS is a relatively costly operation, which requires exclusive access to the corresponding memory. We want to just read the atomic’s state when failing and only go for exclusive access to perform the CAS when conditions are met. So, while there seems to be an additional layer of looping, the efficiency of the actual code is higher.

Below is the synchronization process between two threads. Initially, t1 obtains the lock, then t2 spins. Afterwards, t1 releases the lock, t2 proceeds to execute in the critical section: -

Having covered that, I believe you now have a preliminary understanding of atomic and its underlying CAS. So, aside from being used for other concurrency primitives, what else can atomic do?

Personally, what I use the most is creating various lock-free data structures. For instance, needing a global ID generator. Of course, you could use a module like UUID to generate unique IDs, but if you also require the IDs to be sequential, AtomicUsize is the best choice.

You could use fetch_add to increment this ID, and the result of fetch_add could be used for the current ID. Hence, without needing locks, you get an ID generator that’s safe to use in a multi-threaded environment.

Additionally, atomic can also be used to record various metrics of the system. For example, a simple in-memory Metrics module:

use std::{
    collections::HashMap,
    sync::atomic::{AtomicUsize, Ordering},
};

// server statistics
pub struct Metrics(HashMap<&'static str, AtomicUsize>);

impl Metrics {
    pub fn new(names: &[&'static str]) -> Self {
        let mut metrics: HashMap<&'static str, AtomicUsize> = HashMap::new();
        for name in names.iter() {
            metrics.insert(name, AtomicUsize::new(0));
        }
        Self(metrics)
    }

    pub fn inc(&self, name: &'static str) {
        if let Some(m) = self.0.get(name) {
            m.fetch_add(1, Ordering::Relaxed);
        }
    }

    pub fn add(&self, name: &'static str, val: usize) {
        if let Some(m) = self.0.get(name) {
            m.fetch_add(val, Ordering::Relaxed);
        }
    }

    pub fn dec(&self, name: &'static str) {
        if let Some(m) = self.0.get(name) {
            m.fetch_sub(1, Ordering::Relaxed);
        }
    }

    pub fn snapshot(&self) -> Vec<(&'static str, usize)> {
        self.0
            .iter()
            .map(|(k, v)| (*k, v.load(Ordering::Relaxed)))
            .collect()
    }
}

It allows you to initialize a global metrics table and then operate the corresponding metrics anywhere in the program without locks:

lazy_static! {
    pub(crate) static ref METRICS: Metrics = Metrics::new(&[
        "topics",
        "clients",
        "peers",
        "broadcasts",
        "servers",
        "states",
        "subscribers"
    ]);
}

fn main() {
    METRICS.inc("topics");
    METRICS.inc("subscribers");

    println!("{:?}", METRICS.snapshot());
}

Complete code can be found at GitHub repo or playground.

Mutex #

Although Atomic can handle locking needs in free competition mode, it is still not so convenient to use. We need higher-level concurrency primitives to ensure that software systems control multiple threads accessing the same shared resource so that each thread can access the shared resource exclusively or mutually exclusively.

We know that for a shared resource, if all threads only perform read operations, then there is no need for mutual exclusion, and everyone can access it at any point, many immutable languages (such as Erlang/Elixir) make language-level read-only guarantees, ensuring lock-free operations in concurrent