diff --git a/include/spdlog/async_logger.h b/include/spdlog/async_logger.h index 1c42fd9c..43ce7664 100644 --- a/include/spdlog/async_logger.h +++ b/include/spdlog/async_logger.h @@ -15,6 +15,7 @@ // 3. will throw spdlog_ex upon log exceptions // Upon destruction, logs all remaining messages in the queue before destructing.. +#include #include #include @@ -39,26 +40,23 @@ public: const It& begin, const It& end, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), - const std::function& worker_teardown_cb = nullptr); + const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); async_logger(const std::string& logger_name, sinks_init_list sinks, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), - const std::function& worker_teardown_cb = nullptr); + const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); async_logger(const std::string& logger_name, sink_ptr single_sink, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), - const std::function& worker_teardown_cb = nullptr); + const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); //Wait for the queue to be empty, and flush synchronously //Warning: this can potentialy last forever as we wait it to complete diff --git a/include/spdlog/details/async_log_helper.h b/include/spdlog/details/async_log_helper.h index 3a467258..31f8c771 100644 --- a/include/spdlog/details/async_log_helper.h +++ b/include/spdlog/details/async_log_helper.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -121,10 +122,9 @@ public: const std::vector& sinks, size_t queue_size, const log_err_handler err_handler, + thread_pool* th_pool, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), - const std::function& worker_teardown_cb = nullptr); + const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero() ); void log(const details::log_msg& msg); @@ -149,30 +149,26 @@ private: bool _terminate_requested; + std::atomic _terminated; + + spdlog::log_clock::time_point _last_pop; + + spdlog::log_clock::time_point _last_flush; // overflow policy const async_overflow_policy _overflow_policy; - // worker thread warmup callback - one can set thread priority, affinity, etc - const std::function _worker_warmup_cb; - // auto periodic sink flush parameter const std::chrono::milliseconds _flush_interval_ms; - // worker thread teardown callback - const std::function _worker_teardown_cb; - - // worker thread - std::thread _worker_thread; - void push_msg(async_msg&& new_msg); // worker thread main loop - void worker_loop(); + bool worker_loop(); // pop next message from the queue and process it. will set the last_pop to the pop time // return false if termination of the queue is required - bool process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush); + bool process_next_msg(); void handle_flush_interval(log_clock::time_point& now, log_clock::time_point& last_flush); @@ -182,6 +178,8 @@ private: // wait until the queue is empty void wait_empty_q(); + shared_function_ptr _thread_loop_handle; + }; } } @@ -194,22 +192,22 @@ inline spdlog::details::async_log_helper::async_log_helper( const std::vector& sinks, size_t queue_size, log_err_handler err_handler, + thread_pool* th_pool, const async_overflow_policy overflow_policy, - const std::function& worker_warmup_cb, - const std::chrono::milliseconds& flush_interval_ms, - const std::function& worker_teardown_cb): + const std::chrono::milliseconds& flush_interval_ms): _formatter(formatter), _sinks(sinks), _q(queue_size), _err_handler(err_handler), _flush_requested(false), _terminate_requested(false), + _terminated(false), _overflow_policy(overflow_policy), - _worker_warmup_cb(worker_warmup_cb), _flush_interval_ms(flush_interval_ms), - _worker_teardown_cb(worker_teardown_cb), - _worker_thread(&async_log_helper::worker_loop, this) -{} + _thread_loop_handle( new Func( [this](){ return this->process_next_msg();} ) ) +{ + th_pool->subscribe_handle(_thread_loop_handle); +} // Send to the worker thread termination message(level=off) // and wait for it to finish gracefully @@ -218,7 +216,11 @@ inline spdlog::details::async_log_helper::~async_log_helper() try { push_msg(async_msg(async_msg_type::terminate)); - _worker_thread.join(); + this->flush(true); + while(!_terminated) + { + std::this_thread::sleep_for( std::chrono::milliseconds(10) ); + } } catch (...) // don't crash in destructor {} @@ -229,8 +231,6 @@ inline spdlog::details::async_log_helper::~async_log_helper() inline void spdlog::details::async_log_helper::log(const details::log_msg& msg) { push_msg(async_msg(msg)); - - } inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg) @@ -246,7 +246,6 @@ inline void spdlog::details::async_log_helper::push_msg(details::async_log_helpe } while (!_q.enqueue(std::move(new_msg))); } - } // optionally wait for the queue be empty and request flush from the sinks @@ -257,15 +256,11 @@ inline void spdlog::details::async_log_helper::flush(bool wait_for_q) wait_empty_q(); //return only make after the above flush message was processed } -inline void spdlog::details::async_log_helper::worker_loop() +inline bool spdlog::details::async_log_helper::worker_loop() { try { - if (_worker_warmup_cb) _worker_warmup_cb(); - auto last_pop = details::os::now(); - auto last_flush = last_pop; - while(process_next_msg(last_pop, last_flush)); - if (_worker_teardown_cb) _worker_teardown_cb(); + return (process_next_msg()); } catch (const std::exception &ex) { @@ -275,19 +270,18 @@ inline void spdlog::details::async_log_helper::worker_loop() { _err_handler("Unknown exception"); } + return true; } // process next message in the queue // return true if this thread should still be active (while no terminate msg was received) -inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush) +inline bool spdlog::details::async_log_helper::process_next_msg() { - async_msg incoming_async_msg; - if (_q.dequeue(incoming_async_msg)) { - last_pop = details::os::now(); + _last_pop = details::os::now(); switch (incoming_async_msg.msg_type) { case async_msg_type::flush: @@ -314,10 +308,10 @@ inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_ else { auto now = details::os::now(); - handle_flush_interval(now, last_flush); - sleep_or_yield(now, last_pop); + handle_flush_interval(now, _last_flush); + sleep_or_yield(now, _last_pop); + _terminated = true; return !_terminate_requested; - } } @@ -374,7 +368,6 @@ inline void spdlog::details::async_log_helper::wait_empty_q() { sleep_or_yield(details::os::now(), last_op); } - } diff --git a/include/spdlog/details/async_logger_impl.h b/include/spdlog/details/async_logger_impl.h index 736d2e31..a59ad192 100644 --- a/include/spdlog/details/async_logger_impl.h +++ b/include/spdlog/details/async_logger_impl.h @@ -16,40 +16,36 @@ #include #include + template inline spdlog::async_logger::async_logger(const std::string& logger_name, const It& begin, const It& end, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy, - const std::function& worker_warmup_cb, - const std::chrono::milliseconds& flush_interval_ms, - const std::function& worker_teardown_cb) : + const std::chrono::milliseconds& flush_interval_ms ) : logger(logger_name, begin, end), - _async_log_helper(new details::async_log_helper(_formatter, _sinks, queue_size, _err_handler, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb)) + _async_log_helper(new details::async_log_helper(_formatter, _sinks, queue_size, _err_handler, + th_pool, overflow_policy, flush_interval_ms)) { } inline spdlog::async_logger::async_logger(const std::string& logger_name, sinks_init_list sinks, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy, - const std::function& worker_warmup_cb, - const std::chrono::milliseconds& flush_interval_ms, - const std::function& worker_teardown_cb) : - async_logger(logger_name, sinks.begin(), sinks.end(), queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb) {} + const std::chrono::milliseconds& flush_interval_ms) : + async_logger(logger_name, sinks.begin(), sinks.end(), queue_size, th_pool, overflow_policy, flush_interval_ms) {} inline spdlog::async_logger::async_logger(const std::string& logger_name, sink_ptr single_sink, size_t queue_size, + details::thread_pool* th_pool, const async_overflow_policy overflow_policy, - const std::function& worker_warmup_cb, - const std::chrono::milliseconds& flush_interval_ms, - const std::function& worker_teardown_cb) : - async_logger(logger_name, -{ - single_sink -}, queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb) {} + const std::chrono::milliseconds& flush_interval_ms) : + async_logger(logger_name,{ single_sink }, queue_size,th_pool, overflow_policy, flush_interval_ms) {} inline void spdlog::async_logger::flush() diff --git a/include/spdlog/details/registry.h b/include/spdlog/details/registry.h index ee14adfd..7cf5de66 100644 --- a/include/spdlog/details/registry.h +++ b/include/spdlog/details/registry.h @@ -26,6 +26,8 @@ namespace spdlog { namespace details { + + template class registry_t { public: @@ -53,7 +55,8 @@ public: throw_if_exists(logger_name); std::shared_ptr new_logger; if (_async_mode) - new_logger = std::make_shared(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb); + new_logger = std::make_shared(logger_name, sinks_begin, sinks_end, + _async_q_size, _thread_poll.get(), _overflow_policy, _flush_interval_ms); else new_logger = std::make_shared(logger_name, sinks_begin, sinks_end); @@ -131,15 +134,23 @@ public: _err_handler = handler; } - void set_async_mode(size_t q_size, const async_overflow_policy overflow_policy, const std::function& worker_warmup_cb, const std::chrono::milliseconds& flush_interval_ms, const std::function& worker_teardown_cb) + void set_async_mode(size_t queue_size, + size_t thread_pool_size, + const async_overflow_policy overflow_policy, + const std::function& worker_warmup_cb, + const std::chrono::milliseconds& flush_interval_ms, + const std::function& worker_teardown_cb) { std::lock_guard lock(_mutex); _async_mode = true; - _async_q_size = q_size; + _async_q_size = queue_size; + _threadpool_size = thread_pool_size; _overflow_policy = overflow_policy; _worker_warmup_cb = worker_warmup_cb; _flush_interval_ms = flush_interval_ms; _worker_teardown_cb = worker_teardown_cb; + + _thread_poll.reset( new thread_pool(thread_pool_size,_worker_warmup_cb,_worker_teardown_cb )); } void set_sync_mode() @@ -171,10 +182,13 @@ private: log_err_handler _err_handler; bool _async_mode = false; size_t _async_q_size = 0; + size_t _threadpool_size = 1; async_overflow_policy _overflow_policy = async_overflow_policy::block_retry; std::function _worker_warmup_cb = nullptr; std::chrono::milliseconds _flush_interval_ms; std::function _worker_teardown_cb = nullptr; + std::unique_ptr _thread_poll; + }; #ifdef SPDLOG_NO_REGISTRY_MUTEX typedef registry_t registry; diff --git a/include/spdlog/details/spdlog_impl.h b/include/spdlog/details/spdlog_impl.h index b6c95825..eb46264b 100644 --- a/include/spdlog/details/spdlog_impl.h +++ b/include/spdlog/details/spdlog_impl.h @@ -153,9 +153,18 @@ inline void spdlog::set_error_handler(log_err_handler handler) } -inline void spdlog::set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy, const std::function& worker_warmup_cb, const std::chrono::milliseconds& flush_interval_ms, const std::function& worker_teardown_cb) -{ - details::registry::instance().set_async_mode(queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb); +inline void spdlog::set_async_mode(size_t queue_size, + const async_overflow_policy overflow_policy, + const std::function& worker_warmup_cb, + const std::chrono::milliseconds& flush_interval_ms, + const std::function& worker_teardown_cb, + size_t num_thread_in_pool) +{ + assert( num_thread_in_pool>=1 ); + + details::registry::instance().set_async_mode( + queue_size,num_thread_in_pool, + overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb); } inline void spdlog::set_sync_mode() diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h new file mode 100644 index 00000000..4c226d95 --- /dev/null +++ b/include/spdlog/details/thread_pool.h @@ -0,0 +1,148 @@ +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +typedef std::function Func; +typedef std::shared_ptr shared_function_ptr; +typedef std::weak_ptr weak_function_ptr; + +namespace spdlog +{ +namespace details +{ + +class thread_pool { +public: + thread_pool(size_t num_threads, + const std::function& worker_warmup_cb = nullptr, + const std::function& worker_teardown_cb = nullptr); + + void subscribe_handle(const shared_function_ptr& loop_handle ); + + ~thread_pool(); +private: + // need to keep track of threads so we can join them + std::vector< std::thread > _workers; + // the task queue + + + std::deque< weak_function_ptr > _loop_handles; + std::deque< bool > _busy; + + // synchronization + std::mutex _mutex; + + std::function _worker_warmup_cb; + + std::function _worker_teardown_cb; + + size_t _index; + + bool _stop; +}; + +// the constructor just launches some amount of workers +inline thread_pool::thread_pool(size_t num_threads, + const std::function& worker_warmup_cb, + const std::function& worker_teardown_cb): + _worker_warmup_cb(worker_warmup_cb), + _worker_teardown_cb(worker_teardown_cb), + _index(0), + _stop(false) +{ + assert(num_threads > 0); + + for(size_t i = 0;i lock(_mutex); + + // scan the _loop_handles to find an available one. + for (size_t count=0; count<_loop_handles.size(); count++) + { + _index = (_index+1) % _loop_handles.size(); + loop = _loop_handles[index].lock(); + busy = _busy[index]; + + // if the weak pointer points to a delated handle, remove it + if(!loop ){ + _loop_handles.erase( _loop_handles.begin() + index); + _busy.erase ( _busy.begin() + index); + } + else{ + _busy[index] = true; + } + if( loop && !busy) break; + } + } + + if(loop && !busy) + { + bool continue_loop = (*loop)(); + if(!continue_loop){ + _loop_handles.erase( _loop_handles.begin() + index); + _busy.erase ( _busy.begin() + index); + } + else{ + std::unique_lock lock(_mutex); + _busy[index] = false; + } + } + else{ + // this happens if we haven't find any handle that needs our work + std::this_thread::sleep_for( std::chrono::milliseconds(1) ); + } + } + } + if( _worker_teardown_cb) _worker_teardown_cb(); + } + ); +} + +inline void thread_pool::subscribe_handle(const shared_function_ptr &loop_handle) +{ + std::unique_lock lock(_mutex); + _loop_handles.push_back( loop_handle ); + _busy.push_back( false ); + assert( _loop_handles.size() == 1); +} + + +// the destructor joins all threads +inline thread_pool::~thread_pool() +{ + { + std::unique_lock lock(_mutex); + _stop = true; + } + for(std::thread &worker: _workers) + worker.join(); +} + +} +} +#endif + diff --git a/include/spdlog/spdlog.h b/include/spdlog/spdlog.h index dcb9f59e..00fca69f 100644 --- a/include/spdlog/spdlog.h +++ b/include/spdlog/spdlog.h @@ -60,7 +60,12 @@ void set_error_handler(log_err_handler); // worker_teardown_cb (optional): // callback function that will be called in worker thread upon exit // -void set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const std::function& worker_warmup_cb = nullptr, const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), const std::function& worker_teardown_cb = nullptr); +void set_async_mode(size_t queue_size, + const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, + const std::function& worker_warmup_cb = nullptr, + const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), + const std::function& worker_teardown_cb = nullptr, + size_t num_thread_in_pool = 1); // Turn off async mode void set_sync_mode();