Thread Pool in C++

Summary

In this post, I will introduce how to build a simple thread pool in C++.

Conclusion

The codes are from here. The thread pool only uses thread, mutex, and condition_variable.

#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool;

// our worker thread objects
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

// the actual thread pool
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;

    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex>
                    lock(pool.queue_mutex);

            // look for a work item
            while(!pool.stop)
            { // if there are none wait for notification
                pool.condition.wait(lock);
            }

            if(pool.stop && pool.tasks.empty()) // exit if the pool is stopped
                return;

            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop();

        }   // release lock

        // execute the task
        task();
    }
}

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
        :   stop(false)
{
    for(size_t i = 0;i<threads;++i) {
        workers.push_back(std::thread(Worker(*this)));
    }
}

// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();

    // join them
    for(size_t i = 0;i<workers.size();++i)
        workers[i].join();
}

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);

        // add the task
        tasks.push(std::function<void()>(f));
    } // release lock

    // wake up one thread
    condition.notify_one();
}

Details

I add some comments in the original codes. Please be compliant with the original license in the post. An improved version is available here.

I present some related techniques.

1. condition_variable with predicate

In this post, I introduce the mutex, unique_lock, and condition_variable in C++. I mentioned that condition_variable may encounter spurious wake-up calls. The predicate aims to avoid these spurious wake-up calls.

  1. The function only blocks if pred returns false;
  2. Notifications can only unblock the thread when it becomes true;
// the two are equivalent
cv.wait(lck, [count](){ return count == 0;});

while (count != 0) wait(lck);

2. "Capturing" variables in Lambda functions

You can capture by both reference and value, which you can specify using & and = respectively:

  1. [&epsilon] capture epsilon by reference
  2. [&] captures all variables used in the lambda by reference
  3. [=] captures all variables used in the lambda by value
  4. [&, epsilon] captures variables like with [&], but epsilon by value
  5. [=, &epsilon] captures variables like with [=], but epsilon by reference
  6. [this] simple by-reference capture of the current object
  7. [*this] simple by-copy capture of the current object

Leave a Reply

Your email address will not be published. Required fields are marked *