Tervel  1.0.0
A collection of wait-free containers and algorithms.
ring_buffer_imp.h
Go to the documentation of this file.
1 /*
2 The MIT License (MIT)
3 
4 Copyright (c) 2015 University of Central Florida's Computer Software Engineering
5 Scalable & Secure Systems (CSE - S3) Lab
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24 */
25 
26 #ifndef TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
27 #define TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
28 namespace tervel {
29 namespace containers {
30 namespace wf {
31 
32 template<typename T>
34 RingBuffer(size_t capacity)
35  : capacity_(capacity)
36  , array_(new std::atomic<uintptr_t>[capacity]) {
37  for (uint64_t i = 0; i < capacity_; i++) {
38  array_[i].store(EmptyType(i));
39  }
40 }
41 
42 template<typename T>
43 bool RingBuffer<T>::
44 isFull() {
45  return isFull(tail_.load(), head_.load());
46 }
47 
48 template<typename T>
49 bool RingBuffer<T>::
50 isFull(int64_t tail, int64_t head) {
51  int64_t temp = tail - head;
52  return temp >= capacity_;
53 }
54 
55 
56 template<typename T>
57 bool RingBuffer<T>::
59  return isEmpty(tail_.load(), head_.load());
60 }
61 
62 template<typename T>
63 bool RingBuffer<T>::
64 isEmpty(int64_t tail, int64_t head) {
65  int64_t temp = tail - head;
66  return temp <= 0;
67 }
68 
69 template<typename T>
70 void RingBuffer<T>::
71 atomic_delay_mark(int64_t pos) {
72  array_[pos].fetch_or(delayMark_lsb);
73 }
74 
75 template<typename T>
76 bool RingBuffer<T>::
77 readValue(int64_t pos, uintptr_t &val) {
78  val = array_[pos].load();
79  if (Helper::isHelperType(val)) {
80  Helper * h = Helper::getHelperType(val);
81  std::atomic<void *> *address;
82  address = reinterpret_cast<std::atomic<void *> *>(&(array_[pos]));
84  SlotID pos = SlotID::SHORTUSE;
85  bool res;
86  res = tervel::util::memory::hp::HazardPointer::watch(pos, h, address, h);
87  assert(!res);
88  return false;
89  } else {
90  return true;
91  }
92 }
93 
94 template<typename T>
95 void RingBuffer<T>::
96 getInfo(uintptr_t val, int64_t &val_seqid,
97  bool &val_isValueType, bool &val_isDelayedMarked) {
98  val_isValueType = isValueType(val);
99  val_isDelayedMarked = isDelayedMarked(val);
100  if (val_isValueType) {
101  val_seqid = getValueTypeSeqId(val);
102  } else {
103  val_seqid = getEmptyTypeSeqId(val);
104  }
105 }
106 
107 template<typename T>
109 getValueType(uintptr_t val) {
110  val = val & (~clear_lsb); // ~clear_lsb == 111...000
111  T temp = reinterpret_cast<T>(val);
112  return temp;
113 }
114 
115 template<typename T>
116 bool RingBuffer<T>::
117 dequeue(T &value) {
120 
121  bool retry = true;
122  while(retry) {
123  if (isEmpty()) {
124  return false;
125  } else if(progAssur.isDelayed()) {
126  break;
127  }
128 
129  int64_t seqid = nextHead();
130  uint64_t pos = getPos(seqid);
131  uintptr_t val;
132  uintptr_t new_value = EmptyType(nextSeqId(seqid));
133 
134  bool skip_delay_check = true;
135  while (retry) {
136  if (skip_delay_check) {
137  // Removes a double increment on isDelayed
138  skip_delay_check = false;
139  } else if (progAssur.isDelayed()) {
140  retry = false;
141  break;
142  }
143 
144  if (!readValue(pos, val)) {
145  continue;
146  }
147 
148  int64_t val_seqid;
149  bool val_isValueType;
150  bool val_isDelayedMarked;
151  getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
152 
153  if (val_seqid > seqid) {
154  break;
155  }
156  if (val_isValueType) {
157  if (val_seqid == seqid) {
158  if (val_isDelayedMarked) {
159  new_value = DelayMarkValue(new_value);
160  assert(isDelayedMarked(new_value));
161  }
162  value = getValueType(val);
163 
164  uintptr_t sanity_check = val;
165  if (!array_[pos].compare_exchange_strong(val, new_value)) {
166  assert(!val_isDelayedMarked && "This value changed unexpectedly, it should only be changeable by this thread except for bit marking");
167  assert(DelayMarkValue(sanity_check) == val && "This value changed unexpectedly, it should only be changeable by this thread except for bit marking");
168  new_value = DelayMarkValue(new_value);
169  bool res = array_[pos].compare_exchange_strong(val, new_value);
170  assert(res && " If this assert hits, then somehow another thread changed this value, when only this thread should be able to.");
171  }
172  return true;
173  } else { // val_seqod < seqid
174  if (backoff(pos, val)) {
175  // value changed
176  continue; // process the new value.
177  }
178  // Value has not changed so lets skip it.
179  if (val_isDelayedMarked) {
180  // Its marked and the seqid is less than ours so we
181  // can skip it safely.
182  break;
183  } else {
184  // we blindly mark it and re-examine the value;
185  atomic_delay_mark(pos);
186 
187  continue;
188  }
189  }
190  } else { // val_isEmptyType
191  if (val_isDelayedMarked) {
192  int64_t cur_head = getHead();
193  int64_t temp_pos = getPos(cur_head);
194  // We want to ensure that it has not been assigned.
195  // So we move it up a head.
196  cur_head += 2*capacity_ - temp_pos + pos;
197  uintptr_t temp = EmptyType(cur_head);
198  array_[pos].compare_exchange_strong(temp, new_value);
199  continue;
200  }
201  if (!backoff(pos, val)) {
202  // Value has not changed
203  if (array_[pos].compare_exchange_strong(val, new_value)) {
204  break;
205  }
206  }
207  // Value has changed
208  continue;
209  }
210  } // inner loop
211  } // outer loop.
212 
213  DequeueOp *op = new DequeueOp(this);
215  bool res = op->result(value);
216  op->safe_delete();
217  return res;
218 }
219 
220 
221 template<typename T>
222 bool RingBuffer<T>::
223 enqueue(T value) {
226 
227  bool retry = true;
228  while(retry) {
229  if (isFull()) {
230  return false;
231  } else if (progAssur.isDelayed()) {
232  break;
233  }
234 
235  int64_t seqid = nextTail();
236  uint64_t pos = getPos(seqid);
237  uintptr_t val;
238 
239  bool skip_delay_check = true;
240  while (retry) {
241  if (skip_delay_check) {
242  // We skip the first iteration of this loop?
243  // Reduces a double count
244  skip_delay_check = false;
245  } else if (progAssur.isDelayed()) {
246  retry = false;
247  break;
248  }
249 
250  if (!readValue(pos, val)) {
251  continue;
252  }
253 
254  int64_t val_seqid;
255  bool val_isValueType;
256  bool val_isDelayedMarked;
257  getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
258 
259  if (val_seqid > seqid) {
260  break;
261  }
262 
263 
264  if (val_isDelayedMarked) {
265  // only a dequeue can update this value
266  // lets backoff and see if it changes
267  if (backoff(pos, val)) {
268  // the value changed
269  continue;
270  } else {
271  break; // get a new seqid
272  }
273  } else if (isEmptyType(val)) {
274  int64_t other_seqid = getEmptyTypeSeqId(val);
275  if (other_seqid < seqid) {
276  if (backoff(pos, val)) {
277  // value changed
278  continue; // process the new value.
279  }
280  }
281  // The current value is an EmptyType and its seqid is <= the assigned one.
282  uintptr_t new_value = ValueType(value, seqid);
283  if (array_[pos].compare_exchange_strong(val, new_value)) {
284  return true;
285  } else {
286  // The position was updated and the latest value assigned to val.
287  // So we need to reprocess it.
288  continue;
289  }
290 
291  } else { // (isValueType(val)) {
292  if (backoff(pos, val)) {
293  // value changed
294  continue; // process the new value.
295  } else {
296  // Value has not changed so lets skip it.
297  break;
298  }
299  }
300  } // inner while(progAssur.notDelayed())
301  } // outer while(progAssur.notDelayed())
302 
303  EnqueueOp *op = new EnqueueOp(this, value);
305  bool res = op->result();
306  op->safe_delete();
307  return res;
308 
309 }
310 
311 
312 template<typename T>
313 int64_t RingBuffer<T>::counterAction(std::atomic<int64_t> &counter, int64_t val) {
314  int64_t seqid = counter.fetch_add(val);
315  uint64_t temp = ~0x0;
316  temp = temp >> num_lsb;
317 
318  assert( (seqid == ((seqid << num_lsb) >> num_lsb)) && "Seqid is too large the ring buffer should be recreated before this happens");
319  return seqid;
320 }
321 
322 
323 template<typename T>
325  return counterAction(head_, 0);
326 }
327 
328 template<typename T>
329 int64_t RingBuffer<T>::casHead(int64_t &expected, int64_t new_val) {
330  return head_.compare_exchange_strong(expected, new_val);
331 }
332 
333 template<typename T>
335  return counterAction(head_, 1);
336 }
337 
338 
339 template<typename T>
341  return counterAction(tail_, 0);
342 }
343 
344 template<typename T>
345 int64_t RingBuffer<T>::casTail(int64_t &expected, int64_t new_val) {
346  return tail_.compare_exchange_strong(expected, new_val);
347 }
348 
349 template<typename T>
351  return counterAction(tail_, 1);
352 }
353 
354 
355 
356 
357 template<typename T>
358 uintptr_t RingBuffer<T>::EmptyType(int64_t seqid) {
359  uintptr_t res = seqid;
360  res = res << num_lsb; // 3LSB now 000
361  res = res | emptytype_lsb; // 3LSB now 010
362  return res;
363 }
364 
365 template<typename T>
366 uintptr_t RingBuffer<T>::ValueType(T value, int64_t seqid) {
367  value->func_seqid(seqid);
368  uintptr_t res = reinterpret_cast<uintptr_t>(value);
369  assert((res & clear_lsb) == 0 && " reserved bits are not 0?");
370  return res;
371 }
372 
373 template<typename T>
374 uintptr_t RingBuffer<T>::DelayMarkValue(uintptr_t val) {
375  val = val | delayMark_lsb; // 3LSB now X1X
376  return val;
377 }
378 
379 template<typename T>
380 int64_t RingBuffer<T>::getEmptyTypeSeqId(uintptr_t val) {
381  int64_t res = (val >> num_lsb);
382  return res;
383 }
384 template<typename T>
385 int64_t RingBuffer<T>::getValueTypeSeqId(uintptr_t val) {
386  T temp = getValueType(val);
387  int64_t res = temp->func_seqid();
388  return res;
389 }
390 
391 template<typename T>
392 bool RingBuffer<T>::isEmptyType(uintptr_t p) {
393  return (p & emptytype_lsb) == emptytype_lsb;
394 }
395 
396 template<typename T>
397 bool RingBuffer<T>::isValueType(uintptr_t p) {
398  return !isEmptyType(p);
399 }
400 
401 template<typename T>
403  return (p & delayMark_lsb) == delayMark_lsb;
404 }
405 
406 template<typename T>
407 intptr_t RingBuffer<T>::nextSeqId(int64_t seqid) {
408  return seqid + capacity_;
409 }
410 
411 template<typename T>
412 int64_t RingBuffer<T>::getPos(int64_t seqid) {
413  int64_t temp = seqid % capacity_;
414  assert(temp >= 0);
415  assert(temp < capacity_);
416  return temp;
417 }
418 
419 template<typename T>
420 bool RingBuffer<T>::backoff(int64_t pos, uintptr_t val) {
422  uintptr_t nval = array_[pos].load();
423  if (nval == val) {
424  return false;
425  } else {
426  return true;
427  }
428 }
429 
430 
431 template<typename T>
432 std::string RingBuffer<T>::debug_string(uintptr_t val) {
433  int64_t val_seqid;
434 
435  bool val_isValueType;
436  bool val_isDelayedMarked;
437  getInfo(val, val_seqid, val_isValueType, val_isDelayedMarked);
438 
439  int64_t pos = getPos(val_seqid);
440  std::string res = "{";
441  res += "M: " + std::to_string(val_isDelayedMarked) + "\t";
442  res += "V: " + std::to_string(val_isValueType) + "\t";
443  res += "ID: " + std::to_string(val_seqid) + "\t";
444  res += "Pos: " + std::to_string(pos) + "\t";
445  res += "}";
446  if (val_isValueType) {
447  T temp = getValueType(val);
448  res += "[" + temp->toString() +"]";
449  }
450  if (val_isDelayedMarked) {
451  res += "*";
452  }
453  if (val_seqid == -1) {
454  res += "x";
455  }
456  return res;
457 };
458 
459 template<typename T>
461  std::string res = "";
462 
463  int64_t temp = head_.load();
464  int64_t temp2 = tail_.load();
465  res += "Head: " + std::to_string(temp) + "\t"
466  + " Pos: " + std::to_string(getPos(temp)) + " \n"
467  + "Tail: " + std::to_string(temp2) + "\t"
468  + "Pos: " + std::to_string(getPos(temp2)) + " \n"
469  + "capacity_: " + std::to_string(capacity_) + "\n";
470  if (isFull()) {
471  res += "isFull: True\n";
472  } else {
473  res += "isFull: False\n";
474  }
475  if (isEmpty()) {
476  res += "isEmpty: True\n";
477  } else {
478  res += "isEmpty: False\n";
479  }
480  for (int i = 0; i < capacity_; i++) {
481  res += "[" + std::to_string(i) + "] ";
482  uintptr_t val = array_[i].load();
483  res += debug_string(val);
484  res += "\n";
485  }
486 
487  if (isEmpty() && isFull()) {
488  res += "Error: Buffer is both empty and full.";
489  }
490  return res;
491 };
492 
493 } // namespace wf
494 } // namespace containers
495 } // namespace tervel
496 
497 #endif // TERVEL_CONTAINERS_WF_RINGBUFFER_RINGBUFFER_IMP_H_
static void check_for_announcement(ProgressAssurance *const progress_assuarance=nullptr)
This function checks at most one position in the op_table_ for an OPRecod If one is found it will cal...
Definition: progress_assurance.h:151
static bool isValueType(uintptr_t p)
returns whether or not p represents an ValueType
Definition: ring_buffer_imp.h:397
RingBuffer(size_t capacity)
Ring Buffer constructor.
Definition: ring_buffer_imp.h:34
int64_t nextTail()
performs a fetch-and-add on the tail counter
Definition: ring_buffer_imp.h:350
bool readValue(int64_t pos, uintptr_t &val)
This function attempts to load a value from the buffer.
Definition: ring_buffer_imp.h:77
static uintptr_t EmptyType(int64_t seqid)
Creates a uintptr_t that represents an EmptyType.
Definition: ring_buffer_imp.h:358
void atomic_delay_mark(int64_t pos)
This function places a bitmark on the value held at address.
Definition: ring_buffer_imp.h:71
bool backoff(int64_t pos, uintptr_t val)
A backoff routine in the event of thread delay.
Definition: ring_buffer_imp.h:420
std::unique_ptr< std::atomic< uintptr_t >[]> array_
Definition: ring_buffer.h:441
TODO(steven):
Definition: mcas.h:36
int64_t getPos(int64_t seqid)
Returns the position a seqid belongs at.
Definition: ring_buffer_imp.h:412
STL namespace.
static int64_t getEmptyTypeSeqId(uintptr_t val)
Returns the seqid encoded in the passed value.
Definition: ring_buffer_imp.h:380
int64_t getHead()
performs an atomic load on the head counter
Definition: ring_buffer_imp.h:324
bool dequeue(T &value)
Dequeues a value from the buffer.
Definition: ring_buffer_imp.h:117
static uintptr_t DelayMarkValue(uintptr_t val)
Takes a uintptr_t and places a bitmark on the delayMark_lsb.
Definition: ring_buffer_imp.h:374
bool result()
Definition: enqueue_op_imp.h:134
static void make_announcement(OpRecord *op, const uint64_t tid=tervel::tl_thread_info->get_thread_id(), ProgressAssurance *const prog_assur=tervel::tl_thread_info->get_progress_assurance())
This function places the.
Definition: progress_assurance.h:172
bool isFull()
Returns whether or not the ring buffer is full.
Definition: ring_buffer_imp.h:44
const int64_t capacity_
Definition: ring_buffer.h:436
static bool watch(SlotID slot_id, Element *elem, std::atomic< void * > *address, void *expected, HazardPointer *const hazard_pointer=tervel::tl_thread_info->get_hazard_pointer())
This method is used to achieve a hazard pointer watch on the the based descr.
void safe_delete(bool no_check=false, ElementList *const element_list=tervel::tl_thread_info->get_hp_element_list())
This function is used to free a hazard pointer protected object if it is safe to do so OR add it to a...
Definition: hp_element.h:67
bool result(T &val)
Definition: dequeue_op_imp.h:158
static T getValueType(uintptr_t val)
Returns the value type from a uintptr.
Definition: ring_buffer_imp.h:109
static int64_t getValueTypeSeqId(uintptr_t val)
Returns the seqid of the passed value.
Definition: ring_buffer_imp.h:385
static bool isDelayedMarked(uintptr_t p)
returns whether or not p has a delay mark
Definition: ring_buffer_imp.h:402
int64_t nextHead()
performs a fetch-and-add on the head counter
Definition: ring_buffer_imp.h:334
static uintptr_t ValueType(T value, int64_t seqid)
Creates a uintptr_t that represents an ValueType.
Definition: ring_buffer_imp.h:366
int64_t getTail()
performs an atomic load on the tail counter
Definition: ring_buffer_imp.h:340
void backoff(int duration=TERVEL_DEF_BACKOFF_TIME_NS)
Sets the amount of time in nano-seconds for a thread to backoff before re-retrying.
Definition: util.h:172
std::string debug_string()
This function returns a string debugging information.
Definition: ring_buffer_imp.h:460
static int64_t counterAction(std::atomic< int64_t > &counter, int64_t val)
utility function for incrementing counter
Definition: ring_buffer_imp.h:313
int64_t nextSeqId(int64_t seqid)
Returns the next seqid.
Definition: ring_buffer_imp.h:407
bool isEmpty()
Returns whether or not the ring buffer is empty.
Definition: ring_buffer_imp.h:58
static bool isEmptyType(uintptr_t p)
returns whether or not p represents an EmptyType
Definition: ring_buffer_imp.h:392
bool isDelayed(size_t val=1)
Definition: progress_assurance.h:125
Definition: progress_assurance.h:120
void getInfo(uintptr_t val, int64_t &val_seqid, bool &val_isValueType, bool &val_isMarked)
This function is used to get information from a value read from the ring buffer.
Definition: ring_buffer_imp.h:96
int64_t casHead(int64_t &expected, int64_t new_val)
Definition: ring_buffer_imp.h:329
int64_t casTail(int64_t &expected, int64_t new_val)
Definition: ring_buffer_imp.h:345
bool enqueue(T value)
Enqueues the passed value into the buffer.
Definition: ring_buffer_imp.h:223
SlotID
Definition: hazard_pointer.h:58