Pull to refresh

Idiomatic Event Loop in C++

Reading time12 min
Views16K

Introduction

Today I want to show you a simple but at the same time a very efficient implementation of a well-known concurrency pattern called Event Loop. There are very good libraries out there implementing this pattern but there are lots of cases when using them is an overcomplication. Sometimes it’s enough to have something small and idiomatic, made of C++11 standard library elements, rather than a universal multitool.

Event Loop as a Tool

As a first case, just imagine you’ve got a bunch of classes that are not designed to work in a multi-threaded environment. Maybe because they are inherited from a legacy part of the system, or maybe you are designing new classes right now and you don’t want to overengineer them with a bunch of mutexes inside. But you need them to get accessed from different threads keeping things as simple as possible.

The second case when the pattern can be really helpful is when you have a global object which is either impractical or even impossible to guard with a mutex. As an example, let's take the OpenGL context. Actually, there is no such thing as a “context” class/structure/type in OpenGL like ID3D11Device in DirectX 11 or VkInstance in Vulkan. The context is inherently global, but for the current thread. It is exclusively owned by the thread through Thread Local Storage. So each time you try to change your OpenGL state it really does matter which thread you make OpenGL function calls from. In this case, things get problematic if you have a dedicated thread that loads assets (images, geometry, etc) and you want to transfer the loaded data from the loading thread to the context-owning thread.

In both cases, it would be useful to let other threads “see” a shareable object. However, the real access to that object would go through a dedicated thread.

To summarize, Event Loop can be considered as an alternative for the mutex. Both of them serialize accesses to the guarded object, but in a slightly different manner.

TL;DR; Show Me The Code!

An example of how to use it.

#include <iostream>

int main()
{
	{
		EventLoop eventLoop;
		
		eventLoop.enqueue([]
		{
			std::cout << "message from a different thread\n";
		});
		
		std::cout << "prints before or after the message above\n";
	}
	
	std::cout << "guaranteed to be printed the last\n";
}

And the implementation itself.

#include <condition_variable>
#include <functional>
#include <future>
#include <thread>
#include <vector>

class EventLoop
{
public:
	using callable_t = std::function<void()>;
	
	EventLoop() = default;
	EventLoop(const EventLoop&) = delete;
	EventLoop(EventLoop&&) noexcept = delete;
	~EventLoop() noexcept
	{
		enqueue([this]
		{
			m_running = false;
		});
		m_thread.join();
	}
	
	EventLoop& operator= (const EventLoop&) = delete;
	EventLoop& operator= (EventLoop&&) noexcept = delete;
	
	void enqueue(callable_t&& callable) noexcept
	{
		{
			std::lock_guard<std::mutex> guard(m_mutex);
			m_writeBuffer.emplace_back(std::move(callable));
		}
		m_condVar.notify_one();
	}
	
private:
	std::vector<callable_t> m_writeBuffer;
	std::mutex m_mutex;
	std::condition_variable m_condVar;
	bool m_running{ true };
	std::thread m_thread{ &EventLoop::threadFunc, this };
	
	void threadFunc() noexcept
	{
		std::vector<callable_t> readBuffer;
		
		while (m_running)
		{
			{
				std::unique_lock<std::mutex> lock(m_mutex);
				m_condVar.wait(lock, [this]
				{
					return !m_writeBuffer.empty();
				});
				std::swap(readBuffer, m_writeBuffer);
			}
			
			for (callable_t& func : readBuffer)
			{
				func();
			}
			
			readBuffer.clear();
		}
	}
};

The Basic Implementation

The implementation is quite compact. Feels idiomatic, doesn’t it? But what is so special about it and what makes it efficient?

The Power of std::function

std::function<R(Args…)> is a really interesting thing. It uses two very important idioms that make it so useful for us – Type Erasure and Small-Object Optimization.

Type Erasure idiom allows us to store anything that we can apply the call operator to. I will refer to it as callable_t. It can be a C-like function, it can be a functor. It can also be a lambda, including a generic lambda.

Since our callable_t can have internal data, such as the functor’s members or the lambda’s capture, std::function also has to store all this data inside. In order to avoid or at least minimize heap allocations, std::function can store callable_t in place if it’s small enough. What is “small enough” depends on the implementation. If it doesn’t fit into the internal storage, then heap allocation happens as a fallback. This is what Small-Object Optimization essentially is.

Knowing all of that, you can use a std::vector of std::function to keep both data and pointers to vtables in a single chunk of memory for most cases. And this is the first member of our class – std::vector<callable_t> m_writeBuffer;

The Power of std::condition_variable

A condition variable is a synchronization primitive that makes one thread postpone its execution via wait() until another thread wakes it up via notify_one(). But what if the second thread has called notify_one() just before the first thread calls wait()? If you used a simpler synchronization primitive, such as Win32 Event Object, the first thread would not see the notification at all. In this case, it would fall asleep ending you up having your program deadlocked. Fortunately, std::condition_variable is a bit smarter. It deliberately wants you to lock the mutex which guards some state. While you’re holding the locked mutex std::condition_variable wants you to check the state if the first thread has to fall asleep or the condition has already been satisfied and the thread just needs to keep going. If it turns out that the thread has to be postponed something interesting happens next. Have you noticed that the wait() function accepts that locked mutex? This is because the wait() function asks your operating system to do the following:

  1. Atomically unlock the mutex and postpone the execution.

  2. Atomically lock the mutex and resume the execution when notify_one() / notify_all() call occurs.

I’m saying “atomically” here, but maybe it’s not what you’re thinking about. It’s a feature of the operating system thread scheduler, not the processors’ hardware. You cannot emulate that behavior using atomic variables.

Sometimes the OS may wake the waiting thread up spontaneously. It’s called “spurious wakeups”. There are reasons for that, but I won't cover them here. But you should be prepared for them. So if the thread has been woken up, you need to check your condition again. That’s why I’m passing the predicate to the wait() function here. That version of wait() tests the condition before falling asleep and immediately after. You can read more about it here.

As you can see we have to have a protected state guarded by the mutex. Therefore, the second notifying thread should do the following:

  1. Lock the mutex, change the shared state, and unlock the mutex.

  2. Notify the first thread.

This is essentially what happens in the enqueue() function. Some developers call notify_one() while holding the mutex. It’s not wrong, but it makes the scheme inefficient. In order to avoid extra synchronizations, just make sure you call the notify_one() after you release the mutex. You can read the Notes section to learn more about this problem.

The Power of Double Buffering

What really makes this implementation especially efficient is that we have two buffers here. You may have noticed that we are swapping readBuffer and m_writeBuffer. And we are doing this while the mutex is being locked. std::swap simply swaps the pointers inside those two vectors, which is an extremely fast operation. So we are leaving m_writeBuffer empty, ready to be filled again. Next to the std::swap the scope ends unlocking the mutex.

Now we have a situation where the write buffer is getting filled while the read buffer is being processed. Now, these two processes can go simultaneously without any intersection! When the processing is over, we clear the read buffer. Clearing of std::vector does not cause the underlying storage deallocation. So when we quickly swap those two buffers again this underlying storage is going to be filled up again as the write buffer.

When you’re calling enqueue() and your callable_t is small enough you won’t even touch the heap while constructing it. Inserting this callable_t into the vector that has already got some storage left after the processing step takes almost nothing! What about locking the mutex? It turns out that modern mutexes use atomic spin-locks as the first step and after several iterations, they ask the operating system to postpone the thread. So if you do something really quick between lock() and unlock() you won’t even disturb the operating system. notify_one() is also designed to be fast in case you call it while the mutex is unlocked. So you shouldn’t be bothered with it here. It makes enqueue() extremely fast on average. It also makes wait() fast as well, because we simply check if m_writeBuffer is not empty and swap it.

A Couple of Remarks Regarding "noexcept"

You may be wondering, why am I using noexcept keywords all over the place. And there is a really good explanation in ISO C++ Core Guidelines why you should at least consider using it. Let’s take the enqueue() function. Even though we do have vector insertion here, which may throw std::bad_alloc, this scenario is actually disastrous. Your system either is lacking memory, which can cause a crash somewhere else, or you pushed too many tasks that your working thread cannot handle, which is in fact a poor application design. The lack of the system memory may also prevent throwing the exception about well… the lack of the system memory since throw uses heap allocation. The same logic is applied to the destructor. I’m also enforcing callable_t to be passed by r-value reference to the enqueue() function. In this case, if the callable_t constructor fails it happens outside the Event Loop class.

The situation with the thread function is different. Unfortunately, we cannot declare callable_t as std::function<void()noexcept>() enforcing the client to catch all the exceptions. So if the user’s code throws, we cannot properly handle it. I’m not sure that catching all exceptions by the event loop is a suitable strategy. I’d prefer to just automatically std::terminate() here. But it’s up to you to decide.

Just a Bit Extra

If having the enqueue() function is not enough for you and you want to wait for the result I’ve got a couple of solutions for you.

enqueueSync()

Just an example of what it does and how to use enqueueSync():

std::cout << eventLoop.enqueueSync([](const int& x, int&& y, int z)
{
	return x + y + z;
}, 1, 2, 3);

And the implementation:

template<typename Func, typename... Args>
auto enqueueSync(Func&& callable, Args&& ...args)
{
	if (std::this_thread::get_id() == m_thread.get_id())
	{
		return std::invoke(
			std::forward<Func>(callable),
			std::forward<Args>(args)...);
	}
	
	using return_type = std::invoke_result_t<Func, Args...>;
	using packaged_task_type =
		std::packaged_task<return_type(Args&&...)>;
	
	packaged_task_type task(std::forward<Func>(callable));
	
	enqueue([&]
	{
		task(std::forward<Args>(args)...);
	});
	
	return task.get_future().get();
}

The first if-condition is a protection from a deadlock. Sometimes you may discover a situation when some synchronous task is trying to schedule another synchronous task, leading to a deadlock.

I’m also using here std::packaged_task in conjunction with the std::future. This is a nice way to transfer the function invocation result across the threads via std::future, including all of the exceptions that occurred. Please note, that the enqueueSync() function is not declared as noexcept for this purpose.

enqueueAsync()

The previous example uses std::future. Sometimes you may find it useful to obtain it for further usage instead of waiting for it immediately. This is an example of how to use enqueueAsync().

std::future<int> result = eventLoop.enqueueAsync([](int x, int y)
{
  return x + y;
}, 1, 2);
//
//do some heavy work here
//
std::cout << result.get();

And this is the implementation.

template<typename Func, typename... Args>
[[nodiscard]] auto enqueueAsync(Func&& callable, Args&& ...args)
{
	using return_type = std::invoke_result_t<Func, Args...>;
	using packaged_task_type = std::packaged_task<return_type()>;
	
	auto taskPtr = std::make_shared<packaged_task_type>(std::bind(
		std::forward<Func>(callable), std::forward<Args>(args)...));
	
	enqueue(std::bind(&packaged_task_type::operator(), taskPtr));
	
	return taskPtr->get_future();
}

Several remarks here.

Firstly, as you can see, there is no deadlock protection here, since it’s impossible to detect when and where the result is going to be used. So it’s up to the user to call the method properly.

Secondly, we are using std::shared_ptr here. This is because we are bypassing the limitation of std::packaged_task, which is movable only. However, std::function requires the underlying object to be copyable.

And finally, we’re using here std::bind to copy or move all the arguments, because we are not aware of their lifetime. It’s a protection from dangling references. If you really want to pass an object by reference to enqueueAsync(), you can either capture it as [&] while defining lambda or using std::ref() or std::cref().

Examples

Access Serialization

Let’s just imagine you have a bank account that is not thread-safe.

struct IBankAccount
{
	virtual ~IBankAccount() = default;
	virtual void pay(unsigned amount) noexcept = 0;
	virtual void acquire(unsigned amount) noexcept = 0;
	virtual long long balance() const noexcept = 0;
};

class ThreadUnsafeAccount : public IBankAccount
{
public:
	ThreadUnsafeAccount(long long balance) : m_balance(balance)
	{
	}
	void pay(unsigned amount) noexcept override
	{
		m_balance -= amount;
	}
	void acquire(unsigned amount) noexcept override
	{
		m_balance += amount;
	}
	long long balance() const noexcept override
	{
		return m_balance;
	}
private:
	long long m_balance;
};

If we wrap it around a proxy object like as follows we can start using it in a multithreaded environment.

class ThreadSafeAccount : public IBankAccount
{
public:
	ThreadSafeAccount(
		std::shared_ptr<EventLoop> eventLoop,
		std::shared_ptr<IBankAccount> unknownBankAccount) : 
		m_eventLoop(std::move(eventLoop)),
		m_unknownBankAccount(std::move(unknownBankAccount))
	{
	}
	
	void pay(unsigned amount) noexcept override
	{
		//don't use this alternative because [=] or [&] captures this,
		//but not std::shared_ptr.
		//m_eventLoop->enqueue([=]()
		//{
		//	m_unknownBankAccount->pay(amount);
		//});
		
		//use this alternative instead
		m_eventLoop->enqueue(std::bind(
			&IBankAccount::pay, m_unknownBankAccount, amount));
	}
	void acquire(unsigned amount) noexcept override
	{
		m_eventLoop->enqueue(std::bind(
			&IBankAccount::acquire, m_unknownBankAccount, amount));
	}
	long long balance() const noexcept override
	{
		//capturing via [&] is perfectly valid here
		return m_eventLoop->enqueueSync([&]
		{
			return m_unknownBankAccount->balance();
		});
		
		//or you can use this variant for consistency
		//return m_eventLoop->enqueueSync(
		//	&IBankAccount::balance, m_unknownBankAccount);
	}
private:
	std::shared_ptr<EventLoop> m_eventLoop;
	std::shared_ptr<IBankAccount> m_unknownBankAccount;
};

Now you can start using your initially thread-unsafe bank account from various threads via proxies without any risk of a race condition.

int main()
{
	auto eventLoop = std::make_shared<EventLoop>();
	auto bankAccount = std::make_shared<ThreadUnsafeAccount>(100'000);

	std::thread buy = std::thread([](std::unique_ptr<IBankAccount> account)
	{
		for (int i = 1; i <= 10; ++i)
		{
			account->pay(i);
		}
	}, std::make_unique<ThreadSafeAccount>(eventLoop, bankAccount));

	std::thread sell = std::thread([](std::unique_ptr<IBankAccount> account)
	{
		for (int i = 1; i <= 10; ++i)
		{
			account->acquire(i);
		}
	}, std::make_unique<ThreadSafeAccount>(eventLoop, bankAccount));

	buy.join();
	sell.join();
	
	std::cout << bankAccount->balance() << '\n';
}

Interestingly, the proxy object itself is inherently thread-safe, so you can safely share it between multiple threads.

int main()
{
	ThreadSafeAccount safeAccount(
		std::make_shared<EventLoop>(),
		std::make_shared<ThreadUnsafeAccount>(100'000));
		
		
	std::thread buy = std::thread([&]()
	{
		for (int i = 1; i <= 10; ++i)
		{
			safeAccount.pay(i);
		}
	});
	
	std::thread sell = std::thread([&]
	{
		for (int i = 1; i <= 10; ++i)
		{
			safeAccount.acquire(i);
		}
	});
	
	buy.join();
	sell.join();
	
	std::cout << safeAccount.balance() << '\n';
}

If you’re old enough and it all seems familiar to you, you’re right. This is a manually-crafted Single-Threaded Apartment. Or oversimplified QEventLoop.

Event Handlers

Let’s imagine you need to build a message based-system that supports dedicated handlers for each message type. Each type of message should be processed by a specific handler. However, it is possible that a message can be sent from an arbitrary thread. A canonical example is GUI – mouse clicks, button clicks, and all that jazz. This is how you can implement it with the basic event loop.

First, you declare the event and its trigger.

std::function<void(std::vector<char>)> OnNetworkEvent;

void emitNetworkEvent(EventLoop& loop, std::vector<char> data)
{
	if (!OnNetworkEvent) return;
	
	loop.enqueue(std::bind(std::ref(OnNetworkEvent), std::move(data)));
}

And then you just register your handler and start triggering it from various threads.

int main()
{
	//registering event handler
	OnNetworkEvent = [](std::vector<char>& message)
	{
		std::cout << message.size() << ' ';
	};
	
	EventLoop loop;
	
	//let's trigger the event from different threads
	std::thread t1 = std::thread([](EventLoop& loop)
	{
		for (std::size_t i = 0; i < 10; ++i)
		{
			emitNetworkEvent(loop, std::vector<char>(i));
		}
	}, std::ref(loop));
	
	std::thread t2 = std::thread([](EventLoop& loop)
	{
		for (int i = 10; i < 20; ++i)
		{
			emitNetworkEvent(loop, std::vector<char>(i));
		}
	}, std::ref(loop));
	
	for (int i = 20; i < 30; ++i)
	{
		emitNetworkEvent(loop, std::vector<char>(i));
	}
	
	t1.join();
	t2.join();
	
	loop.enqueue([]
	{
		std::cout << std::endl;
	});
}

Conclusion

I hope I managed to give you a new tool that could make your life easier by adding just a few lines of code. Of course, it’s not a universal tool, but in most cases, you simply just don’t need anything more than enqueue() or enqueueSync().

For more complicated use-cases such as adding priority to messages maybe it’s better to consider a different implementation. You could actually just replace std::vector with std::priority_queue but it would also mean that you should keep only the enqueue() function.

Thanks for reading.

Tags:
Hubs:
+4
Comments2

Articles

Change theme settings