Simple job system using standard C++

After experimenting with the entity-component system this fall (sorry there is no blog for that yet), I wanted to see how difficult it would be to put my other unused CPU cores to good use. I never really got into CPU multithreading seriously, so this is something new for me. The idea behind the entity-component system is both to make more efficient use of a single CPU by having similar data laid out linearly in memory (thus using cache prefetching when iterating), and also making it easier to write parallelized code (because data dependecies are more apparent). In this post I want to talk about the job system I came up with. It will not only make sense in the entity-component system, but generally it should perform well for any large data processing task. I wanted to remain in standard C++ (~11) realm, which I found that it is entirely possible. However, it can be extended with platform specific extensions if wanted. Let’s break this blog up into multiple parts:

  1. Interface – how do we want to use it (I hope you’re familiar with lambdas)
  2. Implementation – let’s see the code
  3. Performance – I guees this would be pointless without it
  4. Windows platform specific extension – optional, if you can still keep going
  5. Closure – all must come to an end

Interface
I think the best way to get my point across would be to just post the interface code here, because it is very short and make a few comments:

#include <functional&gt;

// A Dispatched job will receive this as function argument:
struct JobDispatchArgs
{
	uint32_t jobIndex;
	uint32_t groupIndex;
};

namespace JobSystem
{
	// Create the internal resources such as worker threads, etc. Call it once when initializing the application.
	void Initialize();

	// Add a job to execute asynchronously. Any idle thread will execute this job.
	void Execute(const std::function<void()&gt;&amp; job);

	// Divide a job onto multiple jobs and execute in parallel.
	//	jobCount	: how many jobs to generate for this task.
	//	groupSize	: how many jobs to execute per thread. Jobs inside a group execute serially. It might be worth to increase for small jobs
	//	func		: receives a JobDispatchArgs as parameter
	void Dispatch(uint32_t jobCount, uint32_t groupSize, const std::function<void(JobDispatchArgs)&gt;&amp; job);

	// Check if any threads are working currently or not
	bool IsBusy();

	// Wait until all threads become idle
	void Wait();
}

How this would work? First, we would call JobSystem::Initialize() function somewhere in our application once (before we want to use the job system for the first time around). Probably the best place is when we initialize the application and before doing any resource loading, because the job system can also handle resource loading efficiently.
Then the simplest way we can make a job is by calling JobSystem::Execute(myTask); where myTask is just a simple function without parameters. I use it for example when initializing engine systems, like this:

JobSystem::Execute([] { /*do stuff 1*/ InitGraphics(); });
JobSystem::Execute([] { /*do stuff 2 which is not realted to stuff 1*/ InitPhysics(); });
JobSystem::Execute([] { /*do stuff 3 which is not related to the previous stuff*/ InitInput(); });
JobSystem::Execute([] { /*do stuff 4 which is not related to the previous stuff*/ InitNetwork(); });
JobSystem::Execute([] { /*you probably already get the idea*/ InitSound(); });

The above code creates 5 jobs, which are already probably utilizing 5 worker threads (if the Initialize() created that many workers, but later on that).
Then we can call JobSystem::Wait(); to simply wait for all the jobs to finish, before doing anything that depends on their completion. For example this code would utilize 3 threads:

JobSystem::Execute([] { /*do stuff 1*/ });
JobSystem::Execute([] { /*do stuff 2*/ });
JobSystem::Wait();

Where the main thread is responsible to notify two worker threads that there are some jobs, and then wait. It also means that overall we are doing extra bookkeeping work by the main thread instead of just doing the jobs one by one. But this should be fine when looking at the big picture, because some work is a good fit to be executed in parallel (if there is no dependency between them). We should only do jobs in parallel if they are a good fit, a lot of times it is still just better to do it on the main thread. Oh and by dependency, I mean if there is any variable that is being written by a job and read by an other, then there is a dependency.

The IsBusy() function is a way for the main thread to just see if any worker threads are busy. It can be used if we don’t actually want to wait for work to finish, and the main thread has other meaningful work to do. For instance, drawing a loading screen is a good example.

The last one is the Dispatch function. This is the most interesting and most useful one in my opinion. I was inspired by how compute shaders work on the GPU, and wanted really bad to use something like that on the CPU. You can think of it like the Execute function, but it actually creates multiple jobs instead of one (jobCount argument). Let’s skip the groupSize argument now and return to it shortly. The third argument is the job itself that will receive a JobDispatchArgs argument that will identify which instance of the job is currently running, and which group (hang in there). To demonsrate how it will be used, look at this loop iterating some large dataset:

const uint32_t dataSize = 1000000;
Data dataSet[dataSize];
for(int i = 0; i < dataSize; ++i)
{
	dataSet[i].Compute(i);
}

And let’s rewrite the same thing but this time using Dispatch, to offload it to multiple threads:

const uint32_t groupSize = 1;
JobSystem::Dispatch(dataSize, groupSize, [&dataSet] (JobDispatchArgs args) {
	dataSet[args.jobIndex].Compute(args.jobIndex);
});

The above code does the same thing as the loop, but now several worker threads each operate on a single element of dataSet. Here is when the groupSize argument comes into play. We actually don’t want each thread to only work on a single element. Not only because of caching, but then 1000000 jobs need to be generated that would put a significant burden on the main thread which is creating the jobs. Instead, we should specify let’s say const uint32_t groupSize = 1000; or something like that. That means, one thread will operate on a batch of 1000 elements of the dataSet. In every case that we are putting a Dispatch in our application, the groupSize must be considered. Just like we have to when we are writing compute shaders – which are really similar to this and have the same purpose: performing work over a large dataset, where data is mostly independent of other data in the set. If our dataSet has elements with dependencies, then it might not be a good candidate – unless we can think of a clever way to utilize the groupSize argument. Because the thing with groupSize, is that with groupSize = 1000, it means that 1000 sub-jobs will run serially on the same thread – then there can be dependencies inside the group. But here it starts to get out of topic, so I recommend to get experienced in compute shaders and using groupshared memory, because the same principles could be applied here. I won’t go into handling groupshared memory in this post.

Now that we see how we want to use the job system, let’s implement the internal workings of it, using standard C++ constructs.

Implementation
I will go over this mostly in the order I described the interface. But before getting into the first function which is Initialize(), define the internal state and include files, stuff like that:

#include "JobSystem.h"    // include our interface

#include <algorithm&gt;    // std::max
#include <atomic&gt;    // to use std::atomic<uint64_t&gt;
#include <thread&gt;    // to use std::thread
#include <condition_variable&gt;    // to use std::condition_variable

namespace JobSystem
{
	uint32_t numThreads = 0;    // number of worker threads, it will be initialized in the Initialize() function
	ThreadSafeRingBuffer<std::function<void()&gt;, 256&gt; jobPool;    // a thread safe queue to put pending jobs onto the end (with a capacity of 256 jobs). A worker thread can grab a job from the beginning
	std::condition_variable wakeCondition;    // used in conjunction with the wakeMutex below. Worker threads just sleep when there is no job, and the main thread can wake them up
	std::mutex wakeMutex;    // used in conjunction with the wakeCondition above
	uint64_t currentLabel = 0;    // tracks the state of execution of the main thread
	std::atomic<uint64_t&gt; finishedLabel;    // track the state of execution across background worker threads

	// ...And here will go all of the functions that we will implement
}

Mostly everything here is using standard C++, except the jobPool. It can also be done with a std::deque, but then also make it thread safe by using mutexes or some kind of synchronization primitives. Or if you are interested, here is my very simple thread safe queue (or ring buffer) implementation:

// Fixed size very simple thread safe ring buffer
template <typename T, size_t capacity&gt;
class ThreadSafeRingBuffer
{
public:
	// Push an item to the end if there is free space
	//	Returns true if succesful
	//	Returns false if there is not enough space
	inline bool push_back(const T&amp; item)
	{
		bool result = false;
		lock.lock();
		size_t next = (head + 1) % capacity;
		if (next != tail)
		{
			data[head] = item;
			head = next;
			result = true;
		}
		lock.unlock();
		return result;
	}

	// Get an item if there are any
	//	Returns true if succesful
	//	Returns false if there are no items
	inline bool pop_front(T&amp; item)
	{
		bool result = false;
		lock.lock();
		if (tail != head)
		{
			item = data[tail];
			tail = (tail + 1) % capacity;
			result = true;
		}
		lock.unlock();
		return result;
	}

private:
	T data[capacity];
	size_t head = 0;
	size_t tail = 0;
	std::mutex lock; // this just works better than a spinlock here (on windows)
};

It is templated, so you can also use it for any type of data. The nice thing about it is that is very simple, only uses a fixed size memory allocation. It can’t reallocate memory, doesn’t do anything clever, so performance might be more reliable than std::deque. Also, when an element doesn’t fit because there is no more space, it returns false on push_back, or if it is empty and you want to remove an element, it returns false on pop_back. if you got false, you can just try again and maybe that time, it will be successful, because an other thread grabbed/put a job onto it.

Now we have everything we need, except the actual functions. Here is what Initialize() looks like:

void Initialize()
{
	// Initialize the worker execution state to 0:
	finishedLabel.store(0);

	// Retrieve the number of hardware threads in this system:
	auto numCores = std::thread::hardware_concurrency();

	// Calculate the actual number of worker threads we want:
	numThreads = std::max(1u, numCores);

	// Create all our worker threads while immediately starting them:
	for (uint32_t threadID = 0; threadID < numThreads; ++threadID)
	{
		std::thread worker([] {

			std::function<void()&gt; job; // the current job for the thread, it's empty at start.

			// This is the infinite loop that a worker thread will do 
			while (true)
			{
				if (jobPool.pop_front(job)) // try to grab a job from the jobPool queue
				{
					// It found a job, execute it:
					job(); // execute job
					finishedLabel.fetch_add(1); // update worker label state
				}
				else
				{
					// no job, put thread to sleep
					std::unique_lock<std::mutex&gt; lock(wakeMutex);
					wakeCondition.wait(lock);
				}
			}

		});
		
		// *****Here we could do platform specific thread setup...
		
		worker.detach(); // forget about this thread, let it do it's job in the infinite loop that we created above
	}
}

So, in case the comments didn’t do it justice, let me recap: First, we initialize the worker-side label state, but don’t worry about it yet, it will make sense later (hopefully). Then C++ has the neat hardware_concurrency function that can give us how many cores (incuding hyperthreaded virtual cores) our CPU has, so we create that many worker threads in the for loop. Each thread is created with an infinite loop, that it will be doing until our application exits. But most of the time, each thread will just sleep (until the main threads wakes it up), to not spin the CPU wastefully.

Let’s see the Execute function next:

// This little helper function will not let the system to be deadlocked while the main thread is waiting for something
inline void poll()
{
	wakeCondition.notify_one(); // wake one worker thread
	std::this_thread::yield(); // allow this thread to be rescheduled
}

void Execute(const std::function<void()&gt;&amp; job)
{
	// The main thread label state is updated:
	currentLabel += 1;

	// Try to push a new job until it is pushed successfully:
	while (!jobPool.push_back(job)) { poll(); }

	wakeCondition.notify_one(); // wake one thread
}

Sorry if you got distracted by the poll function above, I did a function for it because it will be used in more places. I will explain, but first, ket’s digest the Execute function: The main thread’s label is incremented. This means, that one job is being submitted, so the JobSystem only becomes idle when the worker label reached the same value. In the previous Initialize function, you can see that in the infinite loop, after a job is executed, the worker label state is also incremented by one. But there it is an atomic operation, because all the workers will be updating that single label.
Then we just push the new job onto the end of the jobPool. As I said, my ringbuffer could be full, so we will try to push the job until it actually succeeds. If it doesn’t succeed, then we invoke the poll function, which will wake up one worker, and yield the main thread. Doing this, we can be sure that the jobPool will eventually be drained by the workers, and the main thread can then schedule the current job.
But maybe, the job was just pushed fine the first time around, and the poll() never executed. In that case, we just wake up any one worker thread that will start working on the job immediately.

I hope I am not putting anyone to sleep yet. Let’s follow with the IsBusy and Wait functions:

bool IsBusy()
{
	// Whenever the main thread label is not reached by the workers, it indicates that some worker is still alive
	return finishedLabel.load() < currentLabel;
}

void Wait()
{
	while (IsBusy()) { poll(); }
}

You can see how we are checking the state of the main thread and the state of the workers in the IsBusy function. This is also used by the Wait function if we just want to put the main thread to idle and drain all the pending jobs.

Now, with implementing the Dispatch function, you will get full insight into the Job System’s inner workings. This is the most complicated of them all, but if you look at it in a way, it is not very different from the Execute function, just does the same thing multiple times with some little twists:

void Dispatch(uint32_t jobCount, uint32_t groupSize, const std::function<void(JobDispatchArgs)&gt;&amp; job)
{
	if (jobCount == 0 || groupSize == 0)
	{
		return;
	}

	// Calculate the amount of job groups to dispatch (overestimate, or "ceil"):
	const uint32_t groupCount = (jobCount + groupSize - 1) / groupSize;

	// The main thread label state is updated:
	currentLabel += groupCount;

	for (uint32_t groupIndex = 0; groupIndex < groupCount; ++groupIndex)
	{
		// For each group, generate one real job:
		auto&amp; jobGroup = [jobCount, groupSize, job, groupIndex]() {

			// Calculate the current group's offset into the jobs:
			const uint32_t groupJobOffset = groupIndex * groupSize;
			const uint32_t groupJobEnd = std::min(groupJobOffset + groupSize, jobCount);

			JobDispatchArgs args;
			args.groupIndex = groupIndex;

			// Inside the group, loop through all job indices and execute job for each index:
			for (uint32_t i = groupJobOffset; i < groupJobEnd; ++i)
			{
				args.jobIndex = i;
				job(args);
			}
		};

		// Try to push a new job until it is pushed successfully:
		while (!jobPool.push_back(jobGroup)) { poll(); }

		wakeCondition.notify_one(); // wake one thread
	}
}

Take a deep breath, and let’s tackle this slowly. First, the jobCount (which you can think of as the “loop size”), can be a dynamic parameter, so if it is 0, just return immediately, because we don’t want to shedule 0 jobs.
Then the groupCount is calculated. Don’t confuse this with groupSize, that you specified as an argument to Dispatch. The groupCount is calculated from the jobCount and groupSize and tells us how many worker jobs (or groups) will be put onto the jobPool. There is an integer divide, but we must overestimate it, because let’s say that we want to have jobCount=3, groupSize=2. From that we want to create 2 actual worker jobs (groups), but the integer divide of jobCount/groupSize would give us only 1 group. We must instead schedule 2 groups, each with 2 jobs. But wait, that would be 4 jobs then, even though we specified jobCount=3. We will just discard the out of bounds jobs later.
Then the main thread label is updated with adding in the groupCount. That is how many workers must finish from this point for the JobSystem to become idle.
The following for loop goes over every group, creates a worker job for it with a lambda, passing in the required parameters to populate the JobDispatchArgs which our dispatched job needs to receive. That lambda is the one that will be executed by a worker thread. It contains logic to populate a JobDispatchArgs and run through all inner sub-jobs while making sure that no extra out of bounds iterations are performed. I think this might be the most complicated part if you are going to try to wrap your mind around someone else’s code. Hang in there!
After that group-worker-lambda thing, we do just like the Execute function, try to schedule the group and get the worker threads working immediately on it.

I’m not sure if I made sense of all this, but at this point the JobSystem is ready to be used actually! Shall we take a look of what kind of performance improvements we can expect right off the bat, then continue reading:

Performance
The first thing I tried, was that I started to use the JobSystem::Execute() in my Wicked Engine when initializing the main engine systems, so each system initialization would be performed in its own execute. The system initialization tasks were large independent tasks, operating on some arbitrary/different data, a good fit for the Execute function. We can make up a test case if we just Execute Spin functions that will perform infinite loop for some time and not let the CPU rest until the time passed. Let me give you some helpers for spin and a simple timer that you can use too in a small console application:

#include "JobSystem.h"

#include <iostream&gt;
#include <chrono&gt;
#include <string&gt;

using namespace std;

void Spin(float milliseconds)
{
	milliseconds /= 1000.0f;
	chrono::high_resolution_clock::time_point t1 = chrono::high_resolution_clock::now();
	double ms = 0;
	while (ms < milliseconds)
	{
		chrono::high_resolution_clock::time_point t2 = chrono::high_resolution_clock::now();
		chrono::duration<double&gt; time_span = chrono::duration_cast<chrono::duration<double&gt;&gt;(t2 - t1);
		ms = time_span.count();
	}
}

struct timer
{
	string name;
	chrono::high_resolution_clock::time_point start;

	timer(const string&amp; name) : name(name), start(chrono::high_resolution_clock::now()) {}
	~timer()
	{
		auto end = chrono::high_resolution_clock::now();
		cout << name << ": " << chrono::duration_cast<chrono::milliseconds&gt;(end - start).count() << " milliseconds" << endl;
	}
};

int main()
{
	JobSystem::Initialize();

	// Serial test
	{
		auto t = timer("Serial test: ");
		Spin(100);
		Spin(100);
		Spin(100);
		Spin(100);
	}

	// Execute test
	{
		auto t = timer("Execute() test: ");
		JobSystem::Execute([] { Spin(100); });
		JobSystem::Execute([] { Spin(100); });
		JobSystem::Execute([] { Spin(100); });
		JobSystem::Execute([] { Spin(100); });
		JobSystem::Wait();
	}

    return 0;
}

The Spin function just emulates a working CPU for a set amount of milliseconds, the timer struct is a scoped timer which will record how much time has passed until the destructor was called – so I just made two scopes for the two tests in the main function. The results are for me:

Serial test: : 400 milliseconds
Execute() test: : 101 milliseconds

The machine I currently ran this test on is a dual core, but has 4 hardware threads, so the results make sense. If I increase the jobs to 6, the times are as following:

Serial test: : 600 milliseconds
Execute() test: : 200 milliseconds

The reason for this is that at most 4 jobs were executing in parallel, then the last two remaining jobs took over after that, also in parallel to each other. Looks like the results look like we intended!

Now, let’s test the Dispatch performance. I came up with this quick test case:

struct Data
{
	float m[16];
	void Compute(uint32_t value)
	{
		for (int i = 0; i < 16; ++i)
		{
			m[i] += float(value + i);
		}
	}
};
uint32_t dataCount = 1000000;

// Loop test:
{
	Data* dataSet = new Data[dataCount];
	{
		auto t = timer("loop test: ");

		for (uint32_t i = 0; i < dataCount; ++i)
		{
			dataSet[i].Compute(i);
		}
	}
	delete[] dataSet;
}

// Dispatch test:
{
	Data* dataSet = new Data[dataCount];
	{
		auto t = timer("Dispatch() test: ");

		const uint32_t groupSize = 1000;
		JobSystem::Dispatch(dataCount, groupSize, [&amp;dataSet](JobDispatchArgs args) {
			dataSet[args.jobIndex].Compute(1);
		});
		JobSystem::Wait();
	}
	delete[] dataSet;
}

Notice that I create the dataSet two times, once for each test. This is important to avoid reusing cached values for the second run of the test. And the results (optimizations disabled):

loop test: : 140 milliseconds
Dispatch() test: : 59 milliseconds

Neat. But when I enabled optimizations, I got other results:

loop test: : 28 milliseconds
Dispatch() test: : 31 milliseconds

That’s not cool, now the simple loop is faster! Then I tried increasing the groupSize to 10000:

loop test: : 28 milliseconds
Dispatch() test: : 15 milliseconds

Ok, now dispatch is nearly two times faster, still not 4 times like we would expect. After trying some other different groupSizes, I could not improve this much better. This behaviour can depend on several factors, such as cache, thread core switching, the operating system overtaking the threads, contention for the jobPool, or just the nature of the workload that is being executed, all of which needs much more investigation, possibly with a dedicated profiling tool. These days there are at least two big ones, Intel Vtune and AMD CodeXL, both of which I am unfamiliar with at the moment.

Windows platform specific extension
If we are on windows, we can try some Winapi functions to extend threading functionality. I will just name a few useful ones. You can put these in the Initialize function after the thread has been created (you can search for * to find that place). Also, I am doing an #include <Windows.h> and #include <sstream> and #include <assert.h> for this next part.

First, we must retrieve the native thread handle from the std::thread, like this:

HANDLE handle = (HANDLE)worker.native_handle();

Once we have that, we are free to pass it to the Winapi functions. For example, we can set a thread affinity mask, to put it on a dedicated hardware thread instead of having it for all possible cores and let the operating system switch them around on demand. This piece of code puts the thread we just created onto one core specified by threadID:

DWORD_PTR affinityMask = 1ull << threadID;
DWORD_PTR affinity_result = SetThreadAffinityMask(handle, affinityMask);
assert(affinity_result &gt; 0);

So the affinityMask is a bitmask where each bit specifies a hardware core that is less than what hardware_concurrency gives us and start from thread 0. By default, the affinity mask should be 2^hardware_concurrecy() – 1 which is also returned by the SetThreadAffinityMask into the affinity_result variable.

We can increase thread priority like so:

BOOL priority_result = SetThreadPriority(handle, THREAD_PRIORITY_HIGHEST);
assert(priority_result != 0);

By default, each thread we create is THREAD_PRIORITY_DEFAULT. Modifying this could help threads not be overtaken by the operating system by lesser priority threads. I’ve found no way to increase performance with this yet, only decrease it.

I found this next one very useful. We can give a name to our thread so that it will show up in the Visual Studio debugger:

std::wstringstream wss;
wss << "JobSystem_" << threadID;
HRESULT hr = SetThreadDescription(handle, wss.str().c_str());
assert(SUCCEEDED(hr));

My general suggestion is not to just mess with platform specific extensions, unless you are sure that you are getting a benefit. In some cases, such as modifying priority can lead to great performance reduction too.

Closure
If you stayed till the end, thank you. This simple job system is ready to be used and can offer a generous speedup, but there is definitely room for improvements, especially for the Dispatch function. I am planning to give an in-depth look to Intel Vtune (they have a trial version now) and AMD CodeXL soon, and possibly follow up on this with some improvements. As always, you can take a look at the current implementation in my Wicked Engine.
The code in the engine is likely to change over time, but I also made a sample for this blog post alone which will stay like this. It contains all the code for the JobSystem and the tests I showed.

Thank you for reading! If you find a mistake, or have suggestions, I would like to hear it!

12 thoughts on “Simple job system using standard C++

  1. Tip: you can put the std::mutex in a std::scoped_lock (RAII) which is constructed in its own local scope to avoid the explicit lock() and unlock() calls. For one std::mutex, there is std::lock_guard as well, but I think this will become obsolete at some point, since std::scoped_lock handles that case as well. 😉

    Liked by 1 person

    • Thanks for the tip. I think that would be useful wherever a function locks a mutex but also returns something in the middle. Then it could be avoided to write unlock two times, which is also a potential bug as it is easy to miss. I will try it. 🙂

      Like

      • Even if locking comprises only a small part of the function, one can typically do something like: void func() { pre(); { const std::scoped_lock lock(mutex_1, …, mutex_n); critical(); } post(); } (note the pair of braces of the inner scope) 🙂 And for completeness a possible implementation based on pbrt-v2 using the Windows API directly: https://bit.ly/2r5XLis and https://bit.ly/2BxHcSN.

        Like

  2. So I tried to use this to load multiple models concurrently using this code:
    JobSystem::Execute( [this]()
    {
    m_pModel1 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge1.FBX" ) );
    } );
    JobSystem::Execute( [this]()
    {
    m_pModel2 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge2.FBX" ) );
    } );
    JobSystem::Execute( [this]()
    {
    m_pModel3 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge3.FBX" ) );
    } );
    JobSystem::Wait();

    However the execution of the actual function seemed to be very slow and the CPU usage spiked to 100%. For comparison, this code using normal std::threads executes much faster:

    std::thread t1( [this]()
    {
    m_pModel1 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge1.FBX" ) );
    } );
    std::thread t2( [this]()
    {
    m_pModel2 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge2.FBX" ) );
    } );
    std::thread t3( [this]()
    {
    m_pModel3 = std::make_unique( ModelLoader::LoadModel( "Assets/dodge3.FBX" ) );
    } );
    t1.join();
    t2.join();
    t3.join();

    Here are my results:
    [08:51:06] - [INFO] Total std::thread execution time: 8.63559
    [08:51:06] - [INFO] Total JobSystem execution time: 18.9253

    Any clue what the reason for this is? If you’d like to test this yourself here’s my repo with the test set up: https://github.com/SlidyBat/BatEngine/

    Like

    • Hi, thanks for the feedback. I’d like to check your code but won’t promise it. I expect you ran into some contention somewhere. My first try would be probably to limit the worker thread count, because now there can be one extra thread: For example if you have 8 cores, there will be 8 worker threads + 1 main scheduler thread now, and maybe that messes it up. I can’t say for sure though and ideally it would need some profiling.

      Like

  3. Hi turanzkij,

    Seems like that may have been the culprit for the big 10s slowdown. I’m testing this on a 6 core laptop (with hyperthreading) and after manually decreasing the thread count from std::thread::hardware_concurrency to 3 I get these results:
    [11:44:49] - [INFO] Total std::thread execution time: 8.67088
    [11:44:49] - [INFO] Total JobSystem execution time: 9.26911

    Seems that it’s still consistently ~1s slower every time.

    Thanks.

    Like

  4. Really cool explanation, and implementation of this Job System. I just have one question about the function poll(). Is it really necesary to do the notify_one call? I mean you will end up calling it after the while(push_back) on Execute and Dispatch functions so, what’s the reason to do it always? I’ve tought in many scenarios and no one needs those extra calls.

    Thanks for this and the other blogs, they have been really useful :{)

    Like

    • That’s a good question and it is not immediately obvious why it is needed. The reason is, that there is no atomicity between a worker finishing a job (or go to sleep) and a new job appearing in the queue. It was causing deadlocks for me in rare cases when there was no notify in the poll() function.

      Like

  5. Nice article – and a very nice and simple job system. I have a question about ThreadSafeRingBuffer – is there any particular reason you are locking here? I’m pretty certain that because the providers and consumers of the buffer update separate pointers (head and tail) you could implement this with atomic primitives, rather than a costly sync primitive such as mutex.

    Also – I’m not sure that I would want to execute the jobs when dispatch is called – I’d rather split the task into two separate bits;

    1) Collect a list of jobs
    2) Executing the jobs and waiting for completion (you can probably “wait and go wide” here, too)

    This makes managing the list of jobs a bit less complicated, and if you can guarantee that you are gathering jobs only happens from a single thread, then you can reduce the complexity of the job queue implementation

    Like

    • Thanks, very good insights, let me try to answer:

      I indeed wanted the ring buffer to be lockless in the beginning and experimented with that a bit, but haven’t found a good solution in the end. The problem comes when the ring buffer needs to wrap, or check if it is full, because those operations involve checking both head and tail and capacity. Interestingly, a mutex can be avoided and instead an atomic spin lock could be used that could be more efficient. And indeed I found it more efficient on an other platform, but on windows 10 the mutex actually performed better.

      What you suggest about not waking threads immediately is more simple and how I started out at the beginning. In the end I found that performance was just better when threads can start work immediately, because the main thread can actually take significant time building the job list for large data. An other thing to mention is that you can make the main thread also start grabbing jobs when it goes into waiting mode, which is something that was not a clear win for me and needs more experimenting.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s