|
|
@ -74,101 +74,101 @@ class mpmc_bounded_queue
|
|
|
|
{
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
public:
|
|
|
|
|
|
|
|
|
|
|
|
using item_type = T;
|
|
|
|
using item_type = T;
|
|
|
|
mpmc_bounded_queue(size_t buffer_size)
|
|
|
|
mpmc_bounded_queue(size_t buffer_size)
|
|
|
|
: buffer_(new cell_t [buffer_size]),
|
|
|
|
: buffer_(new cell_t [buffer_size]),
|
|
|
|
buffer_mask_(buffer_size - 1)
|
|
|
|
buffer_mask_(buffer_size - 1)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
//queue size must be power of two
|
|
|
|
//queue size must be power of two
|
|
|
|
if(!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
|
|
|
|
if(!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
|
|
|
|
throw spdlog_ex("async logger queue size must be power of two");
|
|
|
|
throw spdlog_ex("async logger queue size must be power of two");
|
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i != buffer_size; i += 1)
|
|
|
|
for (size_t i = 0; i != buffer_size; i += 1)
|
|
|
|
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
|
|
|
|
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
|
|
|
|
enqueue_pos_.store(0, std::memory_order_relaxed);
|
|
|
|
enqueue_pos_.store(0, std::memory_order_relaxed);
|
|
|
|
dequeue_pos_.store(0, std::memory_order_relaxed);
|
|
|
|
dequeue_pos_.store(0, std::memory_order_relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
~mpmc_bounded_queue()
|
|
|
|
~mpmc_bounded_queue()
|
|
|
|
{
|
|
|
|
{
|
|
|
|
delete [] buffer_;
|
|
|
|
delete [] buffer_;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool enqueue(T&& data)
|
|
|
|
bool enqueue(T&& data)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
cell_t* cell;
|
|
|
|
cell_t* cell;
|
|
|
|
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
|
|
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
|
|
for (;;)
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
cell = &buffer_[pos & buffer_mask_];
|
|
|
|
cell = &buffer_[pos & buffer_mask_];
|
|
|
|
size_t seq = cell->sequence_.load(std::memory_order_acquire);
|
|
|
|
size_t seq = cell->sequence_.load(std::memory_order_acquire);
|
|
|
|
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
|
|
|
|
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
|
|
|
|
if (dif == 0)
|
|
|
|
if (dif == 0)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
|
|
|
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (dif < 0)
|
|
|
|
else if (dif < 0)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
else
|
|
|
|
{
|
|
|
|
{
|
|
|
|
pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
|
|
pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cell->data_ = std::move(data);
|
|
|
|
cell->data_ = std::move(data);
|
|
|
|
cell->sequence_.store(pos + 1, std::memory_order_release);
|
|
|
|
cell->sequence_.store(pos + 1, std::memory_order_release);
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool dequeue(T& data)
|
|
|
|
bool dequeue(T& data)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
cell_t* cell;
|
|
|
|
cell_t* cell;
|
|
|
|
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
|
|
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
|
|
for (;;)
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
cell = &buffer_[pos & buffer_mask_];
|
|
|
|
cell = &buffer_[pos & buffer_mask_];
|
|
|
|
size_t seq =
|
|
|
|
size_t seq =
|
|
|
|
cell->sequence_.load(std::memory_order_acquire);
|
|
|
|
cell->sequence_.load(std::memory_order_acquire);
|
|
|
|
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
|
|
|
|
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
|
|
|
|
if (dif == 0)
|
|
|
|
if (dif == 0)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
|
|
|
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (dif < 0)
|
|
|
|
else if (dif < 0)
|
|
|
|
return false;
|
|
|
|
return false;
|
|
|
|
else
|
|
|
|
else
|
|
|
|
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
|
|
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
data = std::move(cell->data_);
|
|
|
|
data = std::move(cell->data_);
|
|
|
|
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
|
|
|
|
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
private:
|
|
|
|
struct cell_t
|
|
|
|
struct cell_t
|
|
|
|
{
|
|
|
|
{
|
|
|
|
std::atomic<size_t> sequence_;
|
|
|
|
std::atomic<size_t> sequence_;
|
|
|
|
T data_;
|
|
|
|
T data_;
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
static size_t const cacheline_size = 64;
|
|
|
|
static size_t const cacheline_size = 64;
|
|
|
|
typedef char cacheline_pad_t [cacheline_size];
|
|
|
|
typedef char cacheline_pad_t [cacheline_size];
|
|
|
|
|
|
|
|
|
|
|
|
cacheline_pad_t pad0_;
|
|
|
|
cacheline_pad_t pad0_;
|
|
|
|
cell_t* const buffer_;
|
|
|
|
cell_t* const buffer_;
|
|
|
|
size_t const buffer_mask_;
|
|
|
|
size_t const buffer_mask_;
|
|
|
|
cacheline_pad_t pad1_;
|
|
|
|
cacheline_pad_t pad1_;
|
|
|
|
std::atomic<size_t> enqueue_pos_;
|
|
|
|
std::atomic<size_t> enqueue_pos_;
|
|
|
|
cacheline_pad_t pad2_;
|
|
|
|
cacheline_pad_t pad2_;
|
|
|
|
std::atomic<size_t> dequeue_pos_;
|
|
|
|
std::atomic<size_t> dequeue_pos_;
|
|
|
|
cacheline_pad_t pad3_;
|
|
|
|
cacheline_pad_t pad3_;
|
|
|
|
|
|
|
|
|
|
|
|
mpmc_bounded_queue(mpmc_bounded_queue const&);
|
|
|
|
mpmc_bounded_queue(mpmc_bounded_queue const&);
|
|
|
|
void operator = (mpmc_bounded_queue const&);
|
|
|
|
void operator = (mpmc_bounded_queue const&);
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
} // ns details
|
|
|
|
} // ns details
|
|
|
|