26 #ifndef TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
27 #define TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
29 namespace containers {
36 , array_(new
std::atomic<uintptr_t>[capacity]) {
37 for (uint64_t i = 0; i <
capacity_; i++) {
45 return isFull(tail_.load(), head_.load());
51 int64_t temp = tail - head;
52 return temp >= capacity_;
59 return isEmpty(tail_.load(), head_.load());
65 int64_t temp = tail - head;
72 array_[pos].fetch_or(delayMark_lsb);
78 val = array_[pos].load();
79 if (Helper::isHelperType(val)) {
80 Helper * h = Helper::getHelperType(val);
81 std::atomic<void *> *address;
82 address =
reinterpret_cast<std::atomic<void *> *
>(&(array_[pos]));
84 SlotID pos = SlotID::SHORTUSE;
96 getInfo(uintptr_t val, int64_t &val_seqid,
97 bool &val_isValueType,
bool &val_isDelayedMarked) {
98 val_isValueType = isValueType(val);
99 val_isDelayedMarked = isDelayedMarked(val);
100 if (val_isValueType) {
101 val_seqid = getValueTypeSeqId(val);
103 val_seqid = getEmptyTypeSeqId(val);
110 val = val & (~clear_lsb);
111 T temp =
reinterpret_cast<T
>(val);
129 int64_t seqid = nextHead();
130 uint64_t pos = getPos(seqid);
132 uintptr_t new_value = EmptyType(nextSeqId(seqid));
134 bool skip_delay_check =
true;
136 if (skip_delay_check) {
138 skip_delay_check =
false;
144 if (!readValue(pos, val)) {
149 bool val_isValueType;
150 bool val_isDelayedMarked;
151 getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
153 if (val_seqid > seqid) {
156 if (val_isValueType) {
157 if (val_seqid == seqid) {
158 if (val_isDelayedMarked) {
159 new_value = DelayMarkValue(new_value);
160 assert(isDelayedMarked(new_value));
162 value = getValueType(val);
164 uintptr_t sanity_check = val;
165 if (!array_[pos].compare_exchange_strong(val, new_value)) {
166 assert(!val_isDelayedMarked &&
"This value changed unexpectedly, it should only be changeable by this thread except for bit marking");
167 assert(DelayMarkValue(sanity_check) == val &&
"This value changed unexpectedly, it should only be changeable by this thread except for bit marking");
168 new_value = DelayMarkValue(new_value);
169 bool res = array_[pos].compare_exchange_strong(val, new_value);
170 assert(res &&
" If this assert hits, then somehow another thread changed this value, when only this thread should be able to.");
179 if (val_isDelayedMarked) {
185 atomic_delay_mark(pos);
191 if (val_isDelayedMarked) {
192 int64_t cur_head = getHead();
193 int64_t temp_pos = getPos(cur_head);
196 cur_head += 2*capacity_ - temp_pos + pos;
197 uintptr_t temp = EmptyType(cur_head);
198 array_[pos].compare_exchange_strong(temp, new_value);
203 if (array_[pos].compare_exchange_strong(val, new_value)) {
215 bool res = op->
result(value);
235 int64_t seqid = nextTail();
236 uint64_t pos = getPos(seqid);
239 bool skip_delay_check =
true;
241 if (skip_delay_check) {
244 skip_delay_check =
false;
250 if (!readValue(pos, val)) {
255 bool val_isValueType;
256 bool val_isDelayedMarked;
257 getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
259 if (val_seqid > seqid) {
264 if (val_isDelayedMarked) {
273 }
else if (isEmptyType(val)) {
274 int64_t other_seqid = getEmptyTypeSeqId(val);
275 if (other_seqid < seqid) {
282 uintptr_t new_value = ValueType(value, seqid);
283 if (array_[pos].compare_exchange_strong(val, new_value)) {
314 int64_t seqid = counter.fetch_add(val);
315 uint64_t temp = ~0x0;
316 temp = temp >> num_lsb;
318 assert( (seqid == ((seqid << num_lsb) >> num_lsb)) &&
"Seqid is too large the ring buffer should be recreated before this happens");
325 return counterAction(head_, 0);
330 return head_.compare_exchange_strong(expected, new_val);
335 return counterAction(head_, 1);
341 return counterAction(tail_, 0);
346 return tail_.compare_exchange_strong(expected, new_val);
351 return counterAction(tail_, 1);
359 uintptr_t res = seqid;
360 res = res << num_lsb;
361 res = res | emptytype_lsb;
367 value->func_seqid(seqid);
368 uintptr_t res =
reinterpret_cast<uintptr_t
>(value);
369 assert((res & clear_lsb) == 0 &&
" reserved bits are not 0?");
375 val = val | delayMark_lsb;
381 int64_t res = (val >> num_lsb);
386 T temp = getValueType(val);
387 int64_t res = temp->func_seqid();
393 return (p & emptytype_lsb) == emptytype_lsb;
398 return !isEmptyType(p);
403 return (p & delayMark_lsb) == delayMark_lsb;
408 return seqid + capacity_;
413 int64_t temp = seqid % capacity_;
415 assert(temp < capacity_);
422 uintptr_t nval = array_[pos].load();
435 bool val_isValueType;
436 bool val_isDelayedMarked;
437 getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
439 int64_t pos = getPos(val_seqid);
440 std::string res =
"{";
441 res +=
"M: " + std::to_string(val_isDelayedMarked) +
"\t";
442 res +=
"V: " + std::to_string(val_isValueType) +
"\t";
443 res +=
"ID: " + std::to_string(val_seqid) +
"\t";
444 res +=
"Pos: " + std::to_string(pos) +
"\t";
446 if (val_isValueType) {
447 T temp = getValueType(val);
448 res +=
"[" + temp->toString() +
"]";
450 if (val_isDelayedMarked) {
453 if (val_seqid == -1) {
461 std::string res =
"";
463 int64_t temp = head_.load();
464 int64_t temp2 = tail_.load();
465 res +=
"Head: " + std::to_string(temp) +
"\t"
466 +
" Pos: " + std::to_string(getPos(temp)) +
" \n"
467 +
"Tail: " + std::to_string(temp2) +
"\t"
468 +
"Pos: " + std::to_string(getPos(temp2)) +
" \n"
469 +
"capacity_: " + std::to_string(capacity_) +
"\n";
471 res +=
"isFull: True\n";
473 res +=
"isFull: False\n";
476 res +=
"isEmpty: True\n";
478 res +=
"isEmpty: False\n";
480 for (
int i = 0; i < capacity_; i++) {
481 res +=
"[" + std::to_string(i) +
"] ";
482 uintptr_t val = array_[i].load();
483 res += debug_string(val);
487 if (isEmpty() && isFull()) {
488 res +=
"Error: Buffer is both empty and full.";
497 #endif // TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
static void check_for_announcement(ProgressAssurance *const progress_assuarance=nullptr)
This function checks at most one position in the op_table_ for an OPRecod If one is found it will cal...
Definition: progress_assurance.h:151
static bool isValueType(uintptr_t p)
returns whether or not p represents an ValueType
Definition: ring_buffer_imp.h:397
RingBuffer(size_t capacity)
Ring Buffer constructor.
Definition: ring_buffer_imp.h:34
int64_t nextTail()
performs a fetch-and-add on the tail counter
Definition: ring_buffer_imp.h:350
bool readValue(int64_t pos, uintptr_t &val)
This function attempts to load a value from the buffer.
Definition: ring_buffer_imp.h:77
static uintptr_t EmptyType(int64_t seqid)
Creates a uintptr_t that represents an EmptyType.
Definition: ring_buffer_imp.h:358
void atomic_delay_mark(int64_t pos)
This function places a bitmark on the value held at address.
Definition: ring_buffer_imp.h:71
bool backoff(int64_t pos, uintptr_t val)
A backoff routine in the event of thread delay.
Definition: ring_buffer_imp.h:420
std::unique_ptr< std::atomic< uintptr_t >[]> array_
Definition: ring_buffer.h:441
TODO(steven):
Definition: mcas.h:36
int64_t getPos(int64_t seqid)
Returns the position a seqid belongs at.
Definition: ring_buffer_imp.h:412
static int64_t getEmptyTypeSeqId(uintptr_t val)
Returns the seqid encoded in the passed value.
Definition: ring_buffer_imp.h:380
Definition: dequeue_op.h:35
Definition: enqueue_op.h:36
int64_t getHead()
performs an atomic load on the head counter
Definition: ring_buffer_imp.h:324
bool dequeue(T &value)
Dequeues a value from the buffer.
Definition: ring_buffer_imp.h:117
static uintptr_t DelayMarkValue(uintptr_t val)
Takes a uintptr_t and places a bitmark on the delayMark_lsb.
Definition: ring_buffer_imp.h:374
bool result()
Definition: enqueue_op_imp.h:134
static void make_announcement(OpRecord *op, const uint64_t tid=tervel::tl_thread_info->get_thread_id(), ProgressAssurance *const prog_assur=tervel::tl_thread_info->get_progress_assurance())
This function places the.
Definition: progress_assurance.h:172
bool isFull()
Returns whether or not the ring buffer is full.
Definition: ring_buffer_imp.h:44
const int64_t capacity_
Definition: ring_buffer.h:436
static bool watch(SlotID slot_id, Element *elem, std::atomic< void * > *address, void *expected, HazardPointer *const hazard_pointer=tervel::tl_thread_info->get_hazard_pointer())
This method is used to achieve a hazard pointer watch on the the based descr.
void safe_delete(bool no_check=false, ElementList *const element_list=tervel::tl_thread_info->get_hp_element_list())
This function is used to free a hazard pointer protected object if it is safe to do so OR add it to a...
Definition: hp_element.h:67
bool result(T &val)
Definition: dequeue_op_imp.h:158
static T getValueType(uintptr_t val)
Returns the value type from a uintptr.
Definition: ring_buffer_imp.h:109
static int64_t getValueTypeSeqId(uintptr_t val)
Returns the seqid of the passed value.
Definition: ring_buffer_imp.h:385
static bool isDelayedMarked(uintptr_t p)
returns whether or not p has a delay mark
Definition: ring_buffer_imp.h:402
int64_t nextHead()
performs a fetch-and-add on the head counter
Definition: ring_buffer_imp.h:334
static uintptr_t ValueType(T value, int64_t seqid)
Creates a uintptr_t that represents an ValueType.
Definition: ring_buffer_imp.h:366
int64_t getTail()
performs an atomic load on the tail counter
Definition: ring_buffer_imp.h:340
void backoff(int duration=TERVEL_DEF_BACKOFF_TIME_NS)
Sets the amount of time in nano-seconds for a thread to backoff before re-retrying.
Definition: util.h:172
std::string debug_string()
This function returns a string debugging information.
Definition: ring_buffer_imp.h:460
static int64_t counterAction(std::atomic< int64_t > &counter, int64_t val)
utility function for incrementing counter
Definition: ring_buffer_imp.h:313
int64_t nextSeqId(int64_t seqid)
Returns the next seqid.
Definition: ring_buffer_imp.h:407
bool isEmpty()
Returns whether or not the ring buffer is empty.
Definition: ring_buffer_imp.h:58
static bool isEmptyType(uintptr_t p)
returns whether or not p represents an EmptyType
Definition: ring_buffer_imp.h:392
bool isDelayed(size_t val=1)
Definition: progress_assurance.h:125
Definition: progress_assurance.h:120
void getInfo(uintptr_t val, int64_t &val_seqid, bool &val_isValueType, bool &val_isMarked)
This function is used to get information from a value read from the ring buffer.
Definition: ring_buffer_imp.h:96
int64_t casHead(int64_t &expected, int64_t new_val)
Definition: ring_buffer_imp.h:329
int64_t casTail(int64_t &expected, int64_t new_val)
Definition: ring_buffer_imp.h:345
bool enqueue(T value)
Enqueues the passed value into the buffer.
Definition: ring_buffer_imp.h:223
SlotID
Definition: hazard_pointer.h:58