Asynchronous Multi-Thread Design Pattern with C++

Xiahua Liu October 19, 2024 #Design Pattern #C++

This post does NOT talk about existing async frameworks in C++.

We will talk the basic concept of asynchronous multi-thread design pattern in C++ and provide a simple example program.

In C++ there is no built-in language support for something like asynchronous channels in Rust. And C++ beginners often find themselves lost to plan their multithread program.

Asynchronous Design Pattern

In traditional synchronous programming, the parent and child threads are usually interlocked with some kind of synchronization mechanism, such as a state machine. The parent the child threads share access to the same memory for communication.

The problem of synchronous pattern is, you need to program the synchronization steps very carefully, otherwise there will be data racing and deadlock situations. Moreover, for a growing software, the sychronization complexity grows exponentially when new steps are added. It has serious scalability issues, so most modern system already ditched this design pattern in their software.

The asynchronous pattern requires the parent not to interrupt the child thread at all, and only interact with the child threads with given I/Os, typical working process is:

You may wonder where are the synchronization steps? Synchronization is done by the parent thread only, although you can add states to the child thread as well, it is generally a bad practice because these steps add unnecessary complexity.

Typically parent and child threads communicate through FIFO queues so the data can be temporarily stored in the queue while the reader is busy doing something else.

Example C++ Program

Here is an example program, the main thread want to distribute the math calculations to a group of worker threads (assume the calculation is just square() function).

We can first define the FIFO queue as the following structure:

template <class T>
struct WorkerChannel_T {
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    // Thread safe
    void push(T data) {
        std::unique_lock lk(mutex_);
        queue_.push(data);
    }
    // Thread safe
    T pop() {
        std::unique_lock lk(mutex_);
        cv_.wait(lk, [this] { return !this->queue_.empty(); });
        T result = queue_.front();
        queue_.pop();
        return result;
    }
    void notify() { cv_.notify_all(); }
}

Notice that push() and pop() are both protected by the mutex_ so it can be used by both parent and child threads.

Now let's define the worker thread as:

void square_worker(WorkerChannel_T<float>& in_ch,
                   WorkerChannel_T<float>& out_ch) {
    while (true) {
        float in_num = in_ch.pop();
        if (std::isnan(in_num)) {
            break;  // End thread
        } else {
            in_num = std::pow(in_num, 2);
            // Simulate 100ms time consumption
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            out_ch.push(in_num);
            out_ch.notify();
        }
    }

Now for our main thread we can simply:

int main() {
    WorkerChannel_T<float> in_ch;
    WorkerChannel_T<float> out_ch;
    auto worker_vec = std::vector<std::thread>();
    // Create 8 worker threads
    for (int i = 0; i < 8; i++) {
        worker_vec.push_back(
            std::thread(square_worker, std::ref(in_ch), std::ref(out_ch)));
    }
    // Post tasks
    for (int i = 0; i < 100; i++) {
        std::cout << "Post number: " << float(i) << std::endl;
        in_ch.push(i);
    }
    // Notify CV
    in_ch.notify();
    for (int i = 0; i < 100; i++) {
        float j = out_ch.pop();
        std::cout << "Get result: " << float(j) << std::endl;
    }
    // Stop all worker threads
    for (int i = 0; i < worker_vec.size(); i++) {
        in_ch.push(NAN);
    }
    in_ch.notify();
    // Join all thread when we done
    for (int i = 0; i < worker_vec.size(); i++) {
        std::cout << "Join worker[" << i << "]" << std::endl;
        worker_vec[i].join();
    }
    return 0;
}

We can share the same WorkerChannel_T<float> with more than one workers. Since only one of them can get the lock and pop() a task from the queue. Also the output channel is shared between them as well.

You can find the whole example C++ code on Godbolt.org.

If you are familar with the other async frameworks you will notice that:

We just create a tiny framework in a few lines with std::thread! However the modern async framework usually is more complicate.

The executioner in the framework could be a thread pool or co-routines instead of a system thread.

One Step Forward

Just like shown in the above example, you can share a FIFO channel among multiple workers. Because we use a mutex to protect the queue, it is safe for all workers to block on one channel.

It is relatively easy to design the worker thread, since it typically only needs to work on a single I/O pair.

However you may also notice that the parent thread can only be blocked on the single channel, what if there are multiple channels that the parent thread needs to watch simultaneously?

Serialized Wait

This the most straight forward answer, that is the parent just wait the result one by one.

It may sound stupid, it is acutally very useful. The parent acutally only needs to wait until the slowest child finishes its task at most.

result1=out1_ch.pop();
result2=out2_ch.pop();
result3=out3_ch.pop();
result4=out4_ch.pop();

After serialized wait like shown above is finished, the parent is guranteed to have all the data ready for the next step.

This situation is usually used when the parent thread needs to have a set of data ready before moving on.

Wait until the first comes

There is another scenario that we want the parent thread to wait until the first result is availble.

The easiest solution is actually quite simple, we just need to use std::variant and share the same channel (with std::variant as storage type) among different threads. For parent thread we can use std::visit to process the result further.

However the easiest solution is static, because you cannot change the channel during the runtime.

You can also have a dedicated channel. When a child pushes new result, the child uses this channel to report which channel number is ready for parent thread to read.