From 6eb241f9e5072038b3e7ba216d9b042647f98683 Mon Sep 17 00:00:00 2001 From: shifu-dev Date: Tue, 11 Oct 2022 16:26:46 +0530 Subject: [PATCH] Added thread-block support in async_logger, to wait for logs to finish --- include/spdlog/async_logger-inl.h | 88 ++++++++++++++++++++++++++++++- include/spdlog/async_logger.h | 29 ++++++++++ tests/test_async.cpp | 1 - 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/include/spdlog/async_logger-inl.h b/include/spdlog/async_logger-inl.h index a1c27a59..54f1096b 100644 --- a/include/spdlog/async_logger-inl.h +++ b/include/spdlog/async_logger-inl.h @@ -23,11 +23,63 @@ SPDLOG_INLINE spdlog::async_logger::async_logger( : async_logger(std::move(logger_name), {std::move(single_sink)}, std::move(tp), overflow_policy) {} +SPDLOG_INLINE spdlog::async_logger::async_logger(const async_logger& other) : logger(other) +{ + thread_pool_ = other.thread_pool_; + overflow_policy_ = other.overflow_policy_; + pending_log_count_ = other.pending_log_count_.load(); +} + +SPDLOG_INLINE size_t spdlog::async_logger::pending_log_count() const noexcept +{ + return pending_log_count_; +} + +SPDLOG_INLINE void spdlog::async_logger::wait() +{ + if (pending_log_count_ > 0) + { + std::unique_lock lock(wait_mutex_); + wait_condition_.wait(lock); + } +} + +template< class Rep, class Period > +SPDLOG_INLINE std::cv_status spdlog::async_logger::wait_for(const std::chrono::duration& rel_time) +{ + if (pending_log_count_ > 0) + { + std::unique_lock lock(wait_mutex_); + return wait_condition_.wait_for(rel_time); + } +} + +template< class Clock, class Duration> +SPDLOG_INLINE std::cv_status spdlog::async_logger::wait_until(const std::chrono::time_point& timeout_time) +{ + if (pending_log_count_ > 0) + { + std::unique_lock lock(wait_mutex_); + return wait_condition_.wait_until(timeout_time); + } +} + +SPDLOG_INLINE bool spdlog::async_logger::block_on_flush() const noexcept +{ + return block_on_flush_; +} + +SPDLOG_INLINE void spdlog::async_logger::block_on_flush(bool value) noexcept +{ + block_on_flush_ = value; +} + // send the log message to the thread pool SPDLOG_INLINE void spdlog::async_logger::sink_it_(const details::log_msg &msg) { if (auto pool_ptr = thread_pool_.lock()) { + on_log_dispatched_(); pool_ptr->post_log(shared_from_this(), msg, overflow_policy_); } else @@ -41,7 +93,14 @@ SPDLOG_INLINE void spdlog::async_logger::flush_() { if (auto pool_ptr = thread_pool_.lock()) { + on_log_dispatched_(); pool_ptr->post_flush(shared_from_this(), overflow_policy_); + + /// this is to provide blocking functionality through logger(not async_logger) interface + if (block_on_flush_) + { + wait(); + } } else { @@ -54,7 +113,9 @@ SPDLOG_INLINE void spdlog::async_logger::flush_() // SPDLOG_INLINE void spdlog::async_logger::backend_sink_it_(const details::log_msg &msg) { - for (auto &sink : sinks_) + on_logged_(); + + for (auto& sink : sinks_) { if (sink->should_log(msg.level)) { @@ -74,7 +135,9 @@ SPDLOG_INLINE void spdlog::async_logger::backend_sink_it_(const details::log_msg SPDLOG_INLINE void spdlog::async_logger::backend_flush_() { - for (auto &sink : sinks_) + on_logged_(); + + for (auto& sink : sinks_) { SPDLOG_TRY { @@ -84,6 +147,27 @@ SPDLOG_INLINE void spdlog::async_logger::backend_flush_() } } +SPDLOG_INLINE void spdlog::async_logger::on_log_dispatched_() +{ + ++pending_log_count_; +} + +SPDLOG_INLINE void spdlog::async_logger::on_logged_() +{ + if (pending_log_count_ == 0) + { + throw_spdlog_ex("all dispatched messages by logger " + + name_ + " have already been logged, but on_message_logged() was invoked"); + } + + pending_log_count_--; + + if (pending_log_count_ == 0) + { + wait_condition_.notify_all(); + } +} + SPDLOG_INLINE std::shared_ptr spdlog::async_logger::clone(std::string new_name) { auto cloned = std::make_shared(*this); diff --git a/include/spdlog/async_logger.h b/include/spdlog/async_logger.h index 91a93fcb..3060a9c9 100644 --- a/include/spdlog/async_logger.h +++ b/include/spdlog/async_logger.h @@ -14,7 +14,9 @@ // Upon destruction, logs all remaining messages in the queue before // destructing.. +#include #include +#include namespace spdlog { @@ -41,6 +43,8 @@ public: : logger(std::move(logger_name), begin, end) , thread_pool_(std::move(tp)) , overflow_policy_(overflow_policy) + , pending_log_count_(0) + , block_on_flush_(false) {} async_logger(std::string logger_name, sinks_init_list sinks_list, std::weak_ptr tp, @@ -49,17 +53,42 @@ public: async_logger(std::string logger_name, sink_ptr single_sink, std::weak_ptr tp, async_overflow_policy overflow_policy = async_overflow_policy::block); + // explicit copy constructor to manage mutex and cv + async_logger(const async_logger& other); + std::shared_ptr clone(std::string new_name) override; +public: + size_t pending_log_count() const noexcept; + + bool block_on_flush() const noexcept; + void block_on_flush(bool value) noexcept; + + void wait(); + + template< class Rep, class Period > + std::cv_status wait_for(const std::chrono::duration& rel_time); + + template< class Clock, class Duration> + std::cv_status wait_until(const std::chrono::time_point& timeout_time); + protected: void sink_it_(const details::log_msg &msg) override; void flush_() override; void backend_sink_it_(const details::log_msg &incoming_log_msg); void backend_flush_(); + void on_log_dispatched_(); + void on_logged_(); + private: std::weak_ptr thread_pool_; async_overflow_policy overflow_policy_; + + std::mutex wait_mutex_; + std::condition_variable wait_condition_; + std::atomic pending_log_count_; + bool block_on_flush_; }; } // namespace spdlog diff --git a/tests/test_async.cpp b/tests/test_async.cpp index 5265bca4..acc4413f 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -84,7 +84,6 @@ TEST_CASE("flush", "[async]") TEST_CASE("async periodic flush", "[async]") { - auto logger = spdlog::create_async("as"); auto test_sink = std::static_pointer_cast(logger->sinks()[0]);