substitute hardcoded sleep with conditiona variables in thread_loop

pull/278/head
davide 9 years ago
parent ad732e3eea
commit 1d3de4885a

@ -24,8 +24,8 @@ namespace details
class thread_pool { class thread_pool {
public: public:
thread_pool(size_t num_threads, thread_pool(size_t num_threads,
const std::function<void()>& worker_warmup_cb = nullptr, const std::function<void()>& worker_warmup_cb = nullptr,
const std::function<void()>& worker_teardown_cb = nullptr); const std::function<void()>& worker_teardown_cb = nullptr);
void subscribe_handle(const shared_function_ptr& loop_handle ); void subscribe_handle(const shared_function_ptr& loop_handle );
@ -42,6 +42,8 @@ private:
// synchronization // synchronization
std::mutex _mutex; std::mutex _mutex;
std::condition_variable _condition;
std::function<void()> _worker_warmup_cb; std::function<void()> _worker_warmup_cb;
std::function<void()> _worker_teardown_cb; std::function<void()> _worker_teardown_cb;
@ -53,8 +55,8 @@ private:
// the constructor just launches some amount of workers // the constructor just launches some amount of workers
inline thread_pool::thread_pool(size_t num_threads, inline thread_pool::thread_pool(size_t num_threads,
const std::function<void()>& worker_warmup_cb, const std::function<void()>& worker_warmup_cb,
const std::function<void()>& worker_teardown_cb): const std::function<void()>& worker_teardown_cb):
_worker_warmup_cb(worker_warmup_cb), _worker_warmup_cb(worker_warmup_cb),
_worker_teardown_cb(worker_teardown_cb), _worker_teardown_cb(worker_teardown_cb),
_index(0), _index(0),
@ -68,52 +70,53 @@ inline thread_pool::thread_pool(size_t num_threads,
if( _worker_warmup_cb) _worker_warmup_cb(); if( _worker_warmup_cb) _worker_warmup_cb();
while( !_stop) while( !_stop)
{ {
if( _loop_handles.empty()){
std::this_thread::sleep_for( std::chrono::microseconds(100) );
}
else
{ {
shared_function_ptr loop; shared_function_ptr handle;
size_t index = 0; size_t index = 0;
bool busy = false; bool is_busy = false;
// find an handle
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
// scan the _loop_handles to find an available one. // scan the _loop_handles to find an available one.
for (size_t count=0; count<_loop_handles.size(); count++) for (size_t count=0; count<_loop_handles.size(); count++)
{ {
_index = (_index+1) % _loop_handles.size(); _index = (_index+1) % _loop_handles.size();
loop = _loop_handles[index].lock(); handle = _loop_handles[index].lock();
busy = _busy[index]; is_busy = _busy[index];
// if the weak pointer points to a delated handle, remove it // if the weak pointer points to a delated handle, remove it
if(!loop ){ if(!handle ){
_loop_handles.erase( _loop_handles.begin() + index); _loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index); _busy.erase ( _busy.begin() + index);
_index = (_index) % _loop_handles.size();
} }
else{ else{
_busy[index] = true; _busy[index] = true;
} }
if( loop && !busy) break; if( handle && !is_busy) break;
} }
} }
if(loop && !busy) if(handle && !is_busy)
{ {
bool continue_loop = (*loop)(); bool continue_loop = (*handle)();
if(!continue_loop){ if(!continue_loop){
_loop_handles.erase( _loop_handles.begin() + index); _loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index); _busy.erase ( _busy.begin() + index);
} }
else{ else{
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
_busy[index] = false; _busy[index] = false;
} }
// not busy anymore. notify to other threads
_condition.notify_one();
} }
else{ else{
// this happens if we haven't find any handle that needs our work // this happens when you didn't find an handle that is not busy
std::this_thread::sleep_for( std::chrono::milliseconds(1) ); std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock);
} }
} }
} }
@ -124,10 +127,12 @@ inline thread_pool::thread_pool(size_t num_threads,
inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle) inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle)
{ {
std::unique_lock<std::mutex> lock(_mutex); {
_loop_handles.push_back( loop_handle ); std::unique_lock<std::mutex> lock(_mutex);
_busy.push_back( false ); _loop_handles.push_back( loop_handle );
assert( _loop_handles.size() == 1); _busy.push_back( false );
}
_condition.notify_one();
} }
@ -138,6 +143,8 @@ inline thread_pool::~thread_pool()
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
_stop = true; _stop = true;
} }
_condition.notify_all();
for(std::thread &worker: _workers) for(std::thread &worker: _workers)
worker.join(); worker.join();
} }

Loading…
Cancel
Save