Ability to run periodic operation in logger thread

pull/903/head
Shai Levi 7 years ago
parent 4b66e94ecf
commit 4c225a6654

@ -104,14 +104,23 @@ struct async_msg
} }
}; };
class async_logging_periodic_worker
{
public:
virtual std::chrono::milliseconds get_interval() = 0;
virtual void do_periodic_work(std::thread::id thread_id) = 0;
};
class thread_pool class thread_pool
{ {
public: public:
using item_type = async_msg; using item_type = async_msg;
using q_type = details::mpmc_blocking_queue<item_type>; using q_type = details::mpmc_blocking_queue<item_type>;
using periodic_worker_ptr_type = std::shared_ptr<async_logging_periodic_worker>;
thread_pool(size_t q_max_items, size_t threads_n) thread_pool(size_t q_max_items, size_t threads_n, periodic_worker_ptr_type periodic_worker = nullptr)
: q_(q_max_items) : q_(q_max_items)
, periodic_worker_(periodic_worker)
{ {
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << // std::cout << "thread_pool() q_size_bytes: " << q_size_bytes <<
// "\tthreads_n: " << threads_n << std::endl; // "\tthreads_n: " << threads_n << std::endl;
@ -167,6 +176,7 @@ public:
private: private:
q_type q_; q_type q_;
periodic_worker_ptr_type periodic_worker_;
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
@ -192,8 +202,25 @@ private:
// was received) // was received)
bool process_next_msg_() bool process_next_msg_()
{ {
using std::chrono::time_point_cast;
using std::chrono::milliseconds;
milliseconds max_time_to_wait_on_queue = milliseconds(10000);
milliseconds time_to_wait_on_queue = max_time_to_wait_on_queue;
if (periodic_worker_) {
static thread_local std::chrono::time_point<std::chrono::system_clock> last_periodic_operation(std::chrono::system_clock::now());
milliseconds passed_from_last_operation = time_point_cast<milliseconds>(std::chrono::system_clock::now()) -
time_point_cast<milliseconds>(last_periodic_operation);
if (passed_from_last_operation > periodic_worker_->get_interval()) {
periodic_worker_->do_periodic_work(std::this_thread::get_id());
last_periodic_operation = std::chrono::system_clock::now();
}
milliseconds time_left_till_operation = periodic_worker_->get_interval() - passed_from_last_operation;
time_to_wait_on_queue = std::min(time_left_till_operation, max_time_to_wait_on_queue);
}
async_msg incoming_async_msg; async_msg incoming_async_msg;
bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10)); bool dequeued = q_.dequeue_for(incoming_async_msg, time_to_wait_on_queue);
if (!dequeued) if (!dequeued)
{ {
return true; return true;

Loading…
Cancel
Save