Tervel  1.0.0
A collection of wait-free containers and algorithms.
mcas_buffer.h
Go to the documentation of this file.
1 /*
2 The MIT License (MIT)
3 
4 Copyright (c) 2015 University of Central Florida's Computer Software Engineering
5 Scalable & Secure Systems (CSE - S3) Lab
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24 */
25 #ifndef __TERVEL_CONTAINERS_LF_MCAS_BUFFER_MCAS_BUFFER_H__
26 #define __TERVEL_CONTAINERS_LF_MCAS_BUFFER_MCAS_BUFFER_H__
27 
28 #include <atomic>
29 #include <cstdint>
30 #include <algorithm>
31 
32 #include <tervel/util/info.h>
34 #include <tervel/util/descriptor.h>
35 
37 
38 namespace tervel {
39 namespace containers {
40 namespace lf {
41 namespace mcas_buffer {
42 
43 template<typename T>
44 class Node : public util::Descriptor {
45  public:
46  explicit Node(T val)
47  : val_(val) {}
48 
49  ~Node() {}
50 
51  T value() {
52  return val_;
53  };
54 
56  void * complete(void *current, std::atomic<void *> *address) {
57  assert(false);
58  return nullptr;
59  };
60 
62  void * get_logical_value() {
63  assert(false);
64  return nullptr;
65  };
66 
67  private:
68  const T val_;
69 }; // class Node
70 
71 template<typename T>
72 class RingBuffer {
73  public:
74  explicit RingBuffer(uint64_t s)
75  : capacity_(s)
76  , buff_(new std::atomic<Node<T> *>[s]) {
77  buff_[0].store(c_tail_value);
78  buff_[1].store(c_head_value);
79  head_.store(1);
80  tail_.store(0);
81  for (uint64_t i = 2; i < capacity_; i++) {
82  buff_[i].store(c_not_value);
83  }
84  };
85 
87  for (uint64_t i = 0; i < capacity_; i++) {
88  Node<T> * cvalue = buff_[i].load();
89  if (cvalue == c_tail_value) {
90  continue;
91  } else if (cvalue == c_head_value) {
92  continue;
93  } else if (cvalue == c_not_value) {
94  continue;
95  } else {
96  delete cvalue;
97  }
98  }
99  };
100 
101 
102  bool enqueue(T value);
103  bool dequeue(T &value);
104 
105  bool is_empty() {
106  uint64_t t = tail();
107  uint64_t h = head();
108 
109  if (h > t+1) {
110  return false;
111  } else {
112  return true;
113  }
114  };
115 
116  bool is_full() {
117  uint64_t t = tail();
118  uint64_t h = head();
119 
120  if (h+1 >= t+capacity_) {
121  return true;
122  } else {
123  return false;
124  }
125  };
126 
127  size_t capacity() {
128  return capacity_;
129  };
130 
131  void print_queue() {
132  for (uint64_t i = 0; i < capacity_; i++) {
133  Node<T> * cvalue = buff_[i].load();
134  if (cvalue == c_tail_value) {
135  printf("[%lu, TAIL(%p) ] ", i, cvalue);
136  } else if (cvalue == c_head_value) {
137  printf("[%lu, HEAD(%p) ] ", i, cvalue);
138  } else if (cvalue == c_not_value) {
139  printf("[%lu, NOT(%p) ] ", i, cvalue);
140  } else {
141  printf("[%lu, VAL(%p)] ", i, reinterpret_cast<void *>(cvalue->value()));
142  }
143  }
144  printf("\n");
145  };
146 
147  private:
148  Node<T> *at(uint64_t pos) {
149  if (pos >= capacity_) {
150  pos = pos % capacity_;
151  }
152  while (true) {
153  Node<T> *node = buff_[pos].load();
154 
155  if (util::memory::rc::is_descriptor_first(reinterpret_cast<void *>(
156  node))) {
157  util::memory::rc::remove_descriptor(reinterpret_cast<void *>(node),
158  reinterpret_cast<std::atomic<void *> *>(&(buff_[pos])));
159  } else {
160  return node;
161  }
162  }
163  }
164 
165  std::atomic<Node<T> *> *address(uint64_t pos) {
166  if (pos >= capacity_) {
167  pos = pos % capacity_;
168  }
169  return &(buff_[pos]);
170  }
171 
172  uint64_t head() {
173  return head_.load();
174  }
175 
176  uint64_t head(uint64_t i) {
177  return head_.fetch_add(i);
178  }
179 
180  uint64_t tail() {
181  return tail_.load();
182  }
183 
184  uint64_t tail(uint64_t i) {
185  return tail_.fetch_add(i);
186  }
187 
188  bool enqueue(Node<T> * node) {
189  if (is_full()) {
190  return false;
191  }
192 
193  uint64_t h = head();
194 
195  while (true) {
196  Node<T> *current = at(h);
197  if (current == c_head_value) {
198  Node<T> *next = at(h+1);
199 
200  if (next == c_tail_value) {
201  return false;
202  } else {
205  bool success;
206  success = mcas->add_cas_triple(address(h), c_head_value, node);
207  assert(success);
208 
209  success = mcas->add_cas_triple(address(h+1), c_not_value, c_head_value);
210  assert(success);
211 
212  success = mcas->execute();
213  mcas->safe_delete();
214 
215  if (success) {
216  head(1);
217  return true;
218  } else {
219  continue;
220  }
221  }
222 
223  } else if (current == c_not_value) {
224  h = head();
225  } else {
226  h++;
227  }
228  }
229  }
230 
232  if (is_empty()) {
233  return c_not_value;
234  }
235 
236  uint64_t t = tail();
237 
238  while (true) {
239  Node<T> *current = at(t);
240  if (current == c_tail_value) {
241  Node<T> *next = at(t+1);
242 
243  if (next == c_head_value) {
244  return c_not_value;
245  } else if (next == c_tail_value) {
246  t++;
247  } else if (next == c_not_value) {
248  t = tail();
249  } else { // some node *
252  bool success;
253  success = mcas->add_cas_triple(address(t), c_tail_value, c_not_value);
254  assert(success);
255 
256  success = mcas->add_cas_triple(address(t+1), next, c_tail_value);
257  assert(success);
258 
259  success = mcas->execute();
260  mcas->safe_delete();
261 
262  if (success) {
263  tail(1);
264  return next;
265  } else {
266  continue;
267  }
268  }
269 
270  } else if (current == c_not_value) {
271  t = tail();
272  } else {
273  t++;
274  }
275  }
276  } // dequeue
277 
278  const uint64_t capacity_;
279  std::atomic<uint64_t> head_;
280  std::atomic<uint64_t> tail_;
281  std::unique_ptr<std::atomic<Node<T> *>[]> buff_;
282 
283  Node<T> * c_not_value = reinterpret_cast<Node<T> *>(0x0L);
284  Node<T> * c_head_value = reinterpret_cast<Node<T> *>(0x10L);
285  Node<T> * c_tail_value = reinterpret_cast<Node<T> *>(0x20L);
286 }; // class RingBuffer
287 
288 
289 
290 template<typename T>
291 bool RingBuffer<T>::enqueue(T value) {
292  Node<T> *node = tervel::util::memory::rc::get_descriptor<Node<T>>(value);
293 
294  if (enqueue(node)) {
295  return true;
296  } else {
298  return false;
299  }
300 };
301 
302 template<typename T>
303 bool RingBuffer<T>::dequeue(T &value) {
304  Node<T> *node = dequeue();
305  if (node == c_not_value) {
306  return false;
307  } else {
308  value = node->value();
310  return true;
311  }
312 };
313 
314 } // namespace tervel
315 } // namespace containers
316 } // namespace lf
317 } // namespace mcas_buffer
318 
319 #endif // __TERVEL_CONTAINERS_LF_MCAS_BUFFER_MCAS_BUFFER_H__
virtual void * get_logical_value()=0
This method is implemented by each sub class.
bool enqueue(T value)
Definition: mcas_buffer.h:291
bool is_empty()
Definition: mcas_buffer.h:105
Node< T > * c_tail_value
Definition: mcas_buffer.h:285
~RingBuffer()
Definition: mcas_buffer.h:86
void free_descriptor(tervel::util::Descriptor *descr, bool dont_check=false)
Once a user is done with a descriptor, they should free it with this method.
Definition: descriptor_util.h:65
TODO(steven):
Definition: mcas.h:36
STL namespace.
uint64_t tail(uint64_t i)
Definition: mcas_buffer.h:184
~Node()
Definition: mcas_buffer.h:49
uint64_t head()
Definition: mcas_buffer.h:172
std::atomic< Node< T > * > * address(uint64_t pos)
Definition: mcas_buffer.h:165
Node< T > * dequeue()
enqueue
Definition: mcas_buffer.h:231
T value()
Definition: mcas_buffer.h:51
RingBuffer(uint64_t s)
Definition: mcas_buffer.h:74
Node< T > * c_head_value
Definition: mcas_buffer.h:284
std::unique_ptr< std::atomic< Node< T > * >[]> buff_
Definition: mcas_buffer.h:281
const uint64_t capacity_
Definition: mcas_buffer.h:278
std::atomic< uint64_t > tail_
Definition: mcas_buffer.h:280
Definition: mcas_buffer.h:44
size_t capacity()
Definition: mcas_buffer.h:127
Node< T > * at(uint64_t pos)
Definition: mcas_buffer.h:148
Node(T val)
Definition: mcas_buffer.h:46
This defines the Descriptor class, this class is designed to be extend and be used in conjunction wit...
Definition: descriptor.h:60
uint64_t tail()
Definition: mcas_buffer.h:180
virtual void * complete(void *current, std::atomic< void * > *address)=0
This method is implemented by each sub class and must guarantee that upon return that the descriptor ...
Definition: mcas_helper.h:42
void * get_logical_value()
This method is implemented by each sub class.
Definition: mcas_buffer.h:62
bool is_descriptor_first(void *descr)
This returns whether or not the least significant bit holds a bitmark.
Definition: descriptor_util.h:189
uint64_t head(uint64_t i)
Definition: mcas_buffer.h:176
Node< T > * c_not_value
Definition: mcas_buffer.h:283
bool enqueue(Node< T > *node)
Definition: mcas_buffer.h:188
const T val_
Definition: mcas_buffer.h:65
void * remove_descriptor(void *expected, std::atomic< void * > *address)
This method is used to remove a descriptor object that is conflict with another threads operation...
Definition: descriptor_util.h:208
bool is_full()
Definition: mcas_buffer.h:116
void * complete(void *current, std::atomic< void * > *address)
This method is implemented by each sub class and must guarantee that upon return that the descriptor ...
Definition: mcas_buffer.h:56
void print_queue()
Definition: mcas_buffer.h:131
std::atomic< uint64_t > head_
Definition: mcas_buffer.h:279