c++线程池实现

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。

线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。

  • 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。 分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。

  • 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。 任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。

半同步半异步线程池

本文实现了一种半同步半异步线程池。具体思想,

  • 一次性创建N个线程;
  • 以while(true)读取同步队列中的值;
  • 执行读取到的值。

注意,本文同步队列中存储的是函数(也叫做任务)。

算法实现,[https://github.com/qingdujun/algorithm/blob/master/thread-pool.cpp]

class ThreadPool {
private:
using Task = std::function<void()>;

public:
ThreadPool(int thread_numbers = std::thread::hardware_concurrency(), int capacity = 100)
: queue_(capacity) {
Start(thread_numbers);
}

~ThreadPool() {
Stop();
}

void Stop() {
std::call_once(once_flag_, [this] { StopThreads(); });
}

void AddTask(const Task& task) {
queue_.Put(task);
}

void AddTask(Task&& task) {
queue_.Put(std::forward<Task>(task));
}

private:
void Start(int thread_numbers) {
running_ = true;
for (int i = 0; i < thread_numbers; ++i) {
thread_list_.emplace_back(std::make_shared<std::thread>(std::bind(&ThreadPool::Run, this)));
}
}

void Run() {
while (running_) {
std::list<Task> task_list;
queue_.Take(task_list);
for (auto& task : task_list) {
if (!running_) {
return;
}
task();
}
}
}

void StopThreads() {
queue_.Stop();
running_ = false;
for (auto thd : thread_list_) {//shared_ptr
if(thd) {
thd->join();
}
}
thread_list_.clear();
}

private:
SyncQueue<Task> queue_;
std::list<std::shared_ptr<std::thread>> thread_list_;
std::atomic_bool running_;
std::once_flag once_flag_;
};

任务同步队列

那么,需要设计一个同步队列管理任务。具体思想如下, - 提供增加任务接口; - 提供读取任务接口; - 提供任务查询接口。

template<typename T>
class SyncQueue {
public:
SyncQueue(int capacity) : capacity_(capacity), stoping_(false) {

}

void Put(const T& x) {
Add(x);
}

void Put(T&& x) {
Add(std::forward<T>(x));
}

void Take(std::list<T>& list) {
std::unique_lock<std::mutex> locker(mutex_);
while (!stoping_ && queue_.empty()) {
not_empty_.wait(locker);
}
if (stoping_) {
return;
}
list = std::move(queue_);
not_full_.notify_one();
}

void Take(T& t) {
std::unique_lock<std::mutex> locker(mutex_);
while (!stoping_ && queue_.empty()) {
not_empty_.wait(locker);
}
if (stoping_) {
return;
}
t = queue_.front();
queue_.pop_front();
not_full_.notify_one();
}

bool Empty() {
std::lock_guard<std::mutex> locker(mutex_);
return queue_.empty();
}

bool Full() {
std::lock_guard<std::mutex> locker(mutex_);
return queue_.size() == capacity_;
}

void Stop() {
{
std::lock_guard<std::mutex> locker(mutex_);
stoping_ = true;
}
not_empty_.notify_all();
not_full_.notify_all();
}


private:
template<typename F>
void Add(F&& x) {
std::unique_lock<std::mutex> locker(mutex_); //lock
while (!stoping_ && queue_.size() >= capacity_) {
not_full_.wait(locker); //unlock & lock
}
if (stoping_) {
return;
}
queue_.emplace_back(std::forward<F>(x));
not_empty_.notify_one();
}

private:
std::mutex mutex_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
int capacity_;
bool stoping_;
std::list<T> queue_;
};

关于到底如何加锁,我发现网上很多地方说的都不一样,我也很疑惑,尝试了几种方式好像都是对的。以下摘取了boost库中的两个函数实现,更深入的内容,以后再补充。

notify_one()

inline void condition_variable::notify_one() BOOST_NOEXCEPT
{
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex);
#endif
BOOST_VERIFY(!pthread_cond_signal(&cond));
}

wait()

template<typename lock_type>
void wait(lock_type& m)
{
int res=0;
{
thread_cv_detail::lock_on_exit<lock_type> guard;
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
#else
boost::pthread::pthread_mutex_scoped_lock check_for_interruption(&internal_mutex);
#endif
guard.activate(m);
res=pthread_cond_wait(&cond,&internal_mutex);
}
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
this_thread::interruption_point();
#endif
if(res)
{
boost::throw_exception(condition_error(res, "boost::condition_variable_any::wait() failed in pthread_cond_wait"));
}
}
template<typename lock_type,typename predicate_type>
void wait(lock_type& m,predicate_type pred)
{
while(!pred()) wait(m);
}

References:

[1] Wikipedia, Spurious wakeup [2] huashu, 线程虚假唤醒 [3] 维基百科,线程池 [4] 祁宇,深入应用C++11代码优化与工程级应用 [5] cplusplus, producer and consumer [6] https://code.woboq.org/appleseed/include/boost/thread/pthread/condition_variable.hpp.html