code cleanup

pull/278/head
davide 9 years ago
parent 1d3de4885a
commit 6d0f48763c

@ -36,8 +36,7 @@ private:
// the task queue // the task queue
std::deque< weak_function_ptr > _loop_handles; std::deque<std::pair<bool, weak_function_ptr> > _loop_handles;
std::deque< bool > _busy;
// synchronization // synchronization
std::mutex _mutex; std::mutex _mutex;
@ -70,54 +69,56 @@ inline thread_pool::thread_pool(size_t num_threads,
if( _worker_warmup_cb) _worker_warmup_cb(); if( _worker_warmup_cb) _worker_warmup_cb();
while( !_stop) while( !_stop)
{ {
shared_function_ptr handle;
auto handle_it = _loop_handles.begin();
bool is_busy = false;
// find an handle
{ {
shared_function_ptr handle; std::unique_lock<std::mutex> lock(_mutex);
size_t index = 0;
bool is_busy = false;
// find an handle // scan the _loop_handles to find an available one.
for (size_t count=0; count<_loop_handles.size(); count++)
{ {
std::unique_lock<std::mutex> lock(_mutex); _index = (_index+1) % _loop_handles.size();
handle_it = _loop_handles.begin() + _index;
// scan the _loop_handles to find an available one. is_busy = handle_it->first;
for (size_t count=0; count<_loop_handles.size(); count++) handle = handle_it->second.lock();
{
_index = (_index+1) % _loop_handles.size();
handle = _loop_handles[index].lock();
is_busy = _busy[index];
// if the weak pointer points to a delated handle, remove it
if(!handle ){
_loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index);
_index = (_index) % _loop_handles.size();
}
else{
_busy[index] = true;
}
if( handle && !is_busy) break;
}
}
if(handle && !is_busy) // if the weak pointer points to a delated handle, remove it
{ if(!handle ){
bool continue_loop = (*handle)(); _loop_handles.erase(handle_it);
if(!continue_loop){ _index = (_index) % _loop_handles.size();
_loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index);
} }
else{ else{
std::unique_lock<std::mutex> lock(_mutex); // mark as busy
_busy[index] = false; handle_it->first = true;
} }
// not busy anymore. notify to other threads if( handle && !is_busy ) break;
_condition.notify_one(); }
}
if(handle && !is_busy)
{
bool continue_loop = (*handle)();
if(!continue_loop){
std::unique_lock<std::mutex> lock(_mutex);
handle_it->first = false;
_loop_handles.erase(handle_it);
} }
else{ else{
// this happens when you didn't find an handle that is not busy
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock); handle_it->first = false;
} }
// not busy anymore. notify to other threads
_condition.notify_one();
}
else{
// this happens when you didn't find an handle that is not busy
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock);
} }
} }
if( _worker_teardown_cb) _worker_teardown_cb(); if( _worker_teardown_cb) _worker_teardown_cb();
@ -129,8 +130,7 @@ inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle
{ {
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
_loop_handles.push_back( loop_handle ); _loop_handles.push_back( std::make_pair(false, weak_function_ptr(loop_handle)) );
_busy.push_back( false );
} }
_condition.notify_one(); _condition.notify_one();
} }

Loading…
Cancel
Save