|
|
|
@ -89,7 +89,7 @@ public:
|
|
|
|
|
|
|
|
|
|
thread_pool(size_t q_size_bytes, size_t threads_n)
|
|
|
|
|
: msg_counter_(0)
|
|
|
|
|
, _q(q_size_bytes)
|
|
|
|
|
, q_(q_size_bytes)
|
|
|
|
|
{
|
|
|
|
|
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl;
|
|
|
|
|
if (threads_n == 0 || threads_n > 1000)
|
|
|
|
@ -98,7 +98,7 @@ public:
|
|
|
|
|
}
|
|
|
|
|
for (size_t i = 0; i < threads_n; i++)
|
|
|
|
|
{
|
|
|
|
|
_threads.emplace_back(std::bind(&thread_pool::worker_loop, this));
|
|
|
|
|
threads_.emplace_back(std::bind(&thread_pool::worker_loop, this));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -107,12 +107,12 @@ public:
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < _threads.size(); i++)
|
|
|
|
|
for (size_t i = 0; i < threads_.size(); i++)
|
|
|
|
|
{
|
|
|
|
|
post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto &t : _threads)
|
|
|
|
|
for (auto &t : threads_)
|
|
|
|
|
{
|
|
|
|
|
t.join();
|
|
|
|
|
}
|
|
|
|
@ -141,19 +141,19 @@ public:
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
std::atomic<size_t> msg_counter_; // total # of messages processed in this pool
|
|
|
|
|
q_type _q;
|
|
|
|
|
q_type q_;
|
|
|
|
|
|
|
|
|
|
std::vector<std::thread> _threads;
|
|
|
|
|
std::vector<std::thread> threads_;
|
|
|
|
|
|
|
|
|
|
void post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy)
|
|
|
|
|
{
|
|
|
|
|
if (overflow_policy == async_overflow_policy::block_retry)
|
|
|
|
|
{
|
|
|
|
|
_q.enqueue(std::move(new_msg));
|
|
|
|
|
q_.enqueue(std::move(new_msg));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
_q.enqueue_nowait(std::move(new_msg));
|
|
|
|
|
q_.enqueue_nowait(std::move(new_msg));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -167,7 +167,7 @@ private:
|
|
|
|
|
bool process_next_msg()
|
|
|
|
|
{
|
|
|
|
|
async_msg incoming_async_msg;
|
|
|
|
|
bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
|
|
|
|
|
bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
|
|
|
|
|
if (!dequeued)
|
|
|
|
|
{
|
|
|
|
|
return true;
|
|
|
|
|