new approach to async: use thread pool to consume the queues.

pull/278/head
davide 9 years ago
parent c69df8ae44
commit f8ad4db8ae

@ -15,6 +15,7 @@
// 3. will throw spdlog_ex upon log exceptions
// Upon destruction, logs all remaining messages in the queue before destructing..
#include <spdlog/details/thread_pool.h>
#include <spdlog/common.h>
#include <spdlog/logger.h>
@ -39,26 +40,23 @@ public:
const It& begin,
const It& end,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr);
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
async_logger(const std::string& logger_name,
sinks_init_list sinks,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr);
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
async_logger(const std::string& logger_name,
sink_ptr single_sink,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr);
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
//Wait for the queue to be empty, and flush synchronously
//Warning: this can potentialy last forever as we wait it to complete

@ -20,6 +20,7 @@
#include <spdlog/details/log_msg.h>
#include <spdlog/details/os.h>
#include <spdlog/formatter.h>
#include <spdlog/details/thread_pool.h>
#include <chrono>
#include <exception>
@ -121,10 +122,9 @@ public:
const std::vector<sink_ptr>& sinks,
size_t queue_size,
const log_err_handler err_handler,
thread_pool* th_pool,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr);
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero() );
void log(const details::log_msg& msg);
@ -149,30 +149,26 @@ private:
bool _terminate_requested;
std::atomic<bool> _terminated;
spdlog::log_clock::time_point _last_pop;
spdlog::log_clock::time_point _last_flush;
// overflow policy
const async_overflow_policy _overflow_policy;
// worker thread warmup callback - one can set thread priority, affinity, etc
const std::function<void()> _worker_warmup_cb;
// auto periodic sink flush parameter
const std::chrono::milliseconds _flush_interval_ms;
// worker thread teardown callback
const std::function<void()> _worker_teardown_cb;
// worker thread
std::thread _worker_thread;
void push_msg(async_msg&& new_msg);
// worker thread main loop
void worker_loop();
bool worker_loop();
// pop next message from the queue and process it. will set the last_pop to the pop time
// return false if termination of the queue is required
bool process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush);
bool process_next_msg();
void handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush);
@ -182,6 +178,8 @@ private:
// wait until the queue is empty
void wait_empty_q();
shared_function_ptr _thread_loop_handle;
};
}
}
@ -194,22 +192,22 @@ inline spdlog::details::async_log_helper::async_log_helper(
const std::vector<sink_ptr>& sinks,
size_t queue_size,
log_err_handler err_handler,
thread_pool* th_pool,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb):
const std::chrono::milliseconds& flush_interval_ms):
_formatter(formatter),
_sinks(sinks),
_q(queue_size),
_err_handler(err_handler),
_flush_requested(false),
_terminate_requested(false),
_terminated(false),
_overflow_policy(overflow_policy),
_worker_warmup_cb(worker_warmup_cb),
_flush_interval_ms(flush_interval_ms),
_worker_teardown_cb(worker_teardown_cb),
_worker_thread(&async_log_helper::worker_loop, this)
{}
_thread_loop_handle( new Func( [this](){ return this->process_next_msg();} ) )
{
th_pool->subscribe_handle(_thread_loop_handle);
}
// Send to the worker thread termination message(level=off)
// and wait for it to finish gracefully
@ -218,7 +216,11 @@ inline spdlog::details::async_log_helper::~async_log_helper()
try
{
push_msg(async_msg(async_msg_type::terminate));
_worker_thread.join();
this->flush(true);
while(!_terminated)
{
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
}
}
catch (...) // don't crash in destructor
{}
@ -229,8 +231,6 @@ inline spdlog::details::async_log_helper::~async_log_helper()
inline void spdlog::details::async_log_helper::log(const details::log_msg& msg)
{
push_msg(async_msg(msg));
}
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
@ -246,7 +246,6 @@ inline void spdlog::details::async_log_helper::push_msg(details::async_log_helpe
}
while (!_q.enqueue(std::move(new_msg)));
}
}
// optionally wait for the queue be empty and request flush from the sinks
@ -257,15 +256,11 @@ inline void spdlog::details::async_log_helper::flush(bool wait_for_q)
wait_empty_q(); //return only make after the above flush message was processed
}
inline void spdlog::details::async_log_helper::worker_loop()
inline bool spdlog::details::async_log_helper::worker_loop()
{
try
{
if (_worker_warmup_cb) _worker_warmup_cb();
auto last_pop = details::os::now();
auto last_flush = last_pop;
while(process_next_msg(last_pop, last_flush));
if (_worker_teardown_cb) _worker_teardown_cb();
return (process_next_msg());
}
catch (const std::exception &ex)
{
@ -275,19 +270,18 @@ inline void spdlog::details::async_log_helper::worker_loop()
{
_err_handler("Unknown exception");
}
return true;
}
// process next message in the queue
// return true if this thread should still be active (while no terminate msg was received)
inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
inline bool spdlog::details::async_log_helper::process_next_msg()
{
async_msg incoming_async_msg;
if (_q.dequeue(incoming_async_msg))
{
last_pop = details::os::now();
_last_pop = details::os::now();
switch (incoming_async_msg.msg_type)
{
case async_msg_type::flush:
@ -314,10 +308,10 @@ inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_
else
{
auto now = details::os::now();
handle_flush_interval(now, last_flush);
sleep_or_yield(now, last_pop);
handle_flush_interval(now, _last_flush);
sleep_or_yield(now, _last_pop);
_terminated = true;
return !_terminate_requested;
}
}
@ -374,7 +368,6 @@ inline void spdlog::details::async_log_helper::wait_empty_q()
{
sleep_or_yield(details::os::now(), last_op);
}
}

@ -16,40 +16,36 @@
#include <chrono>
#include <memory>
template<class It>
inline spdlog::async_logger::async_logger(const std::string& logger_name,
const It& begin,
const It& end,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb) :
const std::chrono::milliseconds& flush_interval_ms ) :
logger(logger_name, begin, end),
_async_log_helper(new details::async_log_helper(_formatter, _sinks, queue_size, _err_handler, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb))
_async_log_helper(new details::async_log_helper(_formatter, _sinks, queue_size, _err_handler,
th_pool, overflow_policy, flush_interval_ms))
{
}
inline spdlog::async_logger::async_logger(const std::string& logger_name,
sinks_init_list sinks,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb) :
async_logger(logger_name, sinks.begin(), sinks.end(), queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb) {}
const std::chrono::milliseconds& flush_interval_ms) :
async_logger(logger_name, sinks.begin(), sinks.end(), queue_size, th_pool, overflow_policy, flush_interval_ms) {}
inline spdlog::async_logger::async_logger(const std::string& logger_name,
sink_ptr single_sink,
size_t queue_size,
details::thread_pool* th_pool,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb) :
async_logger(logger_name,
{
single_sink
}, queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb) {}
const std::chrono::milliseconds& flush_interval_ms) :
async_logger(logger_name,{ single_sink }, queue_size,th_pool, overflow_policy, flush_interval_ms) {}
inline void spdlog::async_logger::flush()

@ -26,6 +26,8 @@ namespace spdlog
{
namespace details
{
template <class Mutex> class registry_t
{
public:
@ -53,7 +55,8 @@ public:
throw_if_exists(logger_name);
std::shared_ptr<logger> new_logger;
if (_async_mode)
new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb);
new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end,
_async_q_size, _thread_poll.get(), _overflow_policy, _flush_interval_ms);
else
new_logger = std::make_shared<logger>(logger_name, sinks_begin, sinks_end);
@ -131,15 +134,23 @@ public:
_err_handler = handler;
}
void set_async_mode(size_t q_size, const async_overflow_policy overflow_policy, const std::function<void()>& worker_warmup_cb, const std::chrono::milliseconds& flush_interval_ms, const std::function<void()>& worker_teardown_cb)
void set_async_mode(size_t queue_size,
size_t thread_pool_size,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb)
{
std::lock_guard<Mutex> lock(_mutex);
_async_mode = true;
_async_q_size = q_size;
_async_q_size = queue_size;
_threadpool_size = thread_pool_size;
_overflow_policy = overflow_policy;
_worker_warmup_cb = worker_warmup_cb;
_flush_interval_ms = flush_interval_ms;
_worker_teardown_cb = worker_teardown_cb;
_thread_poll.reset( new thread_pool(thread_pool_size,_worker_warmup_cb,_worker_teardown_cb ));
}
void set_sync_mode()
@ -171,10 +182,13 @@ private:
log_err_handler _err_handler;
bool _async_mode = false;
size_t _async_q_size = 0;
size_t _threadpool_size = 1;
async_overflow_policy _overflow_policy = async_overflow_policy::block_retry;
std::function<void()> _worker_warmup_cb = nullptr;
std::chrono::milliseconds _flush_interval_ms;
std::function<void()> _worker_teardown_cb = nullptr;
std::unique_ptr<thread_pool> _thread_poll;
};
#ifdef SPDLOG_NO_REGISTRY_MUTEX
typedef registry_t<spdlog::details::null_mutex> registry;

@ -153,9 +153,18 @@ inline void spdlog::set_error_handler(log_err_handler handler)
}
inline void spdlog::set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy, const std::function<void()>& worker_warmup_cb, const std::chrono::milliseconds& flush_interval_ms, const std::function<void()>& worker_teardown_cb)
{
details::registry::instance().set_async_mode(queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb);
inline void spdlog::set_async_mode(size_t queue_size,
const async_overflow_policy overflow_policy,
const std::function<void()>& worker_warmup_cb,
const std::chrono::milliseconds& flush_interval_ms,
const std::function<void()>& worker_teardown_cb,
size_t num_thread_in_pool)
{
assert( num_thread_in_pool>=1 );
details::registry::instance().set_async_mode(
queue_size,num_thread_in_pool,
overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb);
}
inline void spdlog::set_sync_mode()

@ -0,0 +1,148 @@
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <deque>
#include <memory>
#include <thread>
#include <mutex>
#include <future>
#include <functional>
#include <stdexcept>
#include <assert.h>
typedef std::function<bool(void)> Func;
typedef std::shared_ptr<Func> shared_function_ptr;
typedef std::weak_ptr<Func> weak_function_ptr;
namespace spdlog
{
namespace details
{
class thread_pool {
public:
thread_pool(size_t num_threads,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::function<void()>& worker_teardown_cb = nullptr);
void subscribe_handle(const shared_function_ptr& loop_handle );
~thread_pool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > _workers;
// the task queue
std::deque< weak_function_ptr > _loop_handles;
std::deque< bool > _busy;
// synchronization
std::mutex _mutex;
std::function<void()> _worker_warmup_cb;
std::function<void()> _worker_teardown_cb;
size_t _index;
bool _stop;
};
// the constructor just launches some amount of workers
inline thread_pool::thread_pool(size_t num_threads,
const std::function<void()>& worker_warmup_cb,
const std::function<void()>& worker_teardown_cb):
_worker_warmup_cb(worker_warmup_cb),
_worker_teardown_cb(worker_teardown_cb),
_index(0),
_stop(false)
{
assert(num_threads > 0);
for(size_t i = 0;i<num_threads;++i)
_workers.emplace_back( [this]()
{
if( _worker_warmup_cb) _worker_warmup_cb();
while( !_stop)
{
if( _loop_handles.empty()){
std::this_thread::sleep_for( std::chrono::microseconds(100) );
}
else
{
shared_function_ptr loop;
size_t index = 0;
bool busy = false;
{
std::unique_lock<std::mutex> lock(_mutex);
// scan the _loop_handles to find an available one.
for (size_t count=0; count<_loop_handles.size(); count++)
{
_index = (_index+1) % _loop_handles.size();
loop = _loop_handles[index].lock();
busy = _busy[index];
// if the weak pointer points to a delated handle, remove it
if(!loop ){
_loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index);
}
else{
_busy[index] = true;
}
if( loop && !busy) break;
}
}
if(loop && !busy)
{
bool continue_loop = (*loop)();
if(!continue_loop){
_loop_handles.erase( _loop_handles.begin() + index);
_busy.erase ( _busy.begin() + index);
}
else{
std::unique_lock<std::mutex> lock(_mutex);
_busy[index] = false;
}
}
else{
// this happens if we haven't find any handle that needs our work
std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}
}
}
if( _worker_teardown_cb) _worker_teardown_cb();
}
);
}
inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle)
{
std::unique_lock<std::mutex> lock(_mutex);
_loop_handles.push_back( loop_handle );
_busy.push_back( false );
assert( _loop_handles.size() == 1);
}
// the destructor joins all threads
inline thread_pool::~thread_pool()
{
{
std::unique_lock<std::mutex> lock(_mutex);
_stop = true;
}
for(std::thread &worker: _workers)
worker.join();
}
}
}
#endif

@ -60,7 +60,12 @@ void set_error_handler(log_err_handler);
// worker_teardown_cb (optional):
// callback function that will be called in worker thread upon exit
//
void set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const std::function<void()>& worker_warmup_cb = nullptr, const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), const std::function<void()>& worker_teardown_cb = nullptr);
void set_async_mode(size_t queue_size,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr,
size_t num_thread_in_pool = 1);
// Turn off async mode
void set_sync_mode();

Loading…
Cancel
Save