diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index 4c226d95..0d627626 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -24,8 +24,8 @@ namespace details class thread_pool { public: thread_pool(size_t num_threads, - const std::function& worker_warmup_cb = nullptr, - const std::function& worker_teardown_cb = nullptr); + const std::function& worker_warmup_cb = nullptr, + const std::function& worker_teardown_cb = nullptr); void subscribe_handle(const shared_function_ptr& loop_handle ); @@ -42,6 +42,8 @@ private: // synchronization std::mutex _mutex; + std::condition_variable _condition; + std::function _worker_warmup_cb; std::function _worker_teardown_cb; @@ -53,8 +55,8 @@ private: // the constructor just launches some amount of workers inline thread_pool::thread_pool(size_t num_threads, - const std::function& worker_warmup_cb, - const std::function& worker_teardown_cb): + const std::function& worker_warmup_cb, + const std::function& worker_teardown_cb): _worker_warmup_cb(worker_warmup_cb), _worker_teardown_cb(worker_teardown_cb), _index(0), @@ -68,52 +70,53 @@ inline thread_pool::thread_pool(size_t num_threads, if( _worker_warmup_cb) _worker_warmup_cb(); while( !_stop) { - if( _loop_handles.empty()){ - std::this_thread::sleep_for( std::chrono::microseconds(100) ); - } - else { - shared_function_ptr loop; + shared_function_ptr handle; size_t index = 0; - bool busy = false; + bool is_busy = false; + // find an handle { std::unique_lock lock(_mutex); // scan the _loop_handles to find an available one. for (size_t count=0; count<_loop_handles.size(); count++) { - _index = (_index+1) % _loop_handles.size(); - loop = _loop_handles[index].lock(); - busy = _busy[index]; + _index = (_index+1) % _loop_handles.size(); + handle = _loop_handles[index].lock(); + is_busy = _busy[index]; // if the weak pointer points to a delated handle, remove it - if(!loop ){ + if(!handle ){ _loop_handles.erase( _loop_handles.begin() + index); _busy.erase ( _busy.begin() + index); + _index = (_index) % _loop_handles.size(); } else{ _busy[index] = true; } - if( loop && !busy) break; + if( handle && !is_busy) break; } } - if(loop && !busy) + if(handle && !is_busy) { - bool continue_loop = (*loop)(); + bool continue_loop = (*handle)(); if(!continue_loop){ _loop_handles.erase( _loop_handles.begin() + index); _busy.erase ( _busy.begin() + index); } else{ - std::unique_lock lock(_mutex); - _busy[index] = false; + std::unique_lock lock(_mutex); + _busy[index] = false; } + // not busy anymore. notify to other threads + _condition.notify_one(); } else{ - // this happens if we haven't find any handle that needs our work - std::this_thread::sleep_for( std::chrono::milliseconds(1) ); + // this happens when you didn't find an handle that is not busy + std::unique_lock lock(_mutex); + _condition.wait(lock); } } } @@ -124,10 +127,12 @@ inline thread_pool::thread_pool(size_t num_threads, inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle) { - std::unique_lock lock(_mutex); - _loop_handles.push_back( loop_handle ); - _busy.push_back( false ); - assert( _loop_handles.size() == 1); + { + std::unique_lock lock(_mutex); + _loop_handles.push_back( loop_handle ); + _busy.push_back( false ); + } + _condition.notify_one(); } @@ -138,6 +143,8 @@ inline thread_pool::~thread_pool() std::unique_lock lock(_mutex); _stop = true; } + _condition.notify_all(); + for(std::thread &worker: _workers) worker.join(); }