Direct-BT  2.3.1
Direct-BT - Direct Bluetooth Programming.
ringbuffer.hpp
Go to the documentation of this file.
1 /*
2  * Author: Sven Gothel <sgothel@jausoft.com>
3  * Copyright (c) 2020 Gothel Software e.K.
4  * Copyright (c) 2020 ZAFENA AB
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 
26 #ifndef JAU_RINGBUFFER_HPP_
27 #define JAU_RINGBUFFER_HPP_
28 
29 #include <type_traits>
30 #include <atomic>
31 #include <memory>
32 #include <mutex>
33 #include <condition_variable>
34 #include <chrono>
35 #include <algorithm>
36 
37 #include <cstring>
38 #include <string>
39 #include <cstdint>
40 
41 #include <jau/debug.hpp>
42 #include <jau/basic_types.hpp>
43 #include <jau/ordered_atomic.hpp>
44 
45 namespace jau {
46 
47 #if 0
48  #define _DEBUG_DUMP(...) { dump(stderr, __VA_ARGS__); }
49  #define _DEBUG_DUMP2(a, ...) { a.dump(stderr, __VA_ARGS__); }
50  #define _DEBUG_PRINT(...) { fprintf(stderr, __VA_ARGS__); }
51 #else
52  #define _DEBUG_DUMP(...)
53  #define _DEBUG_DUMP2(a, ...)
54  #define _DEBUG_PRINT(...)
55 #endif
56 
57 /**
58  * Ring buffer implementation, a.k.a circular buffer,
59  * exposing <i>lock-free</i>
60  * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods.
61  * <p>
62  * Implementation utilizes the <i>Always Keep One Slot Open</i>,
63  * hence implementation maintains an internal array of <code>capacity</code> <i>plus one</i>!
64  * </p>
65  * <p>
66  * Implementation is thread safe if:
67  * <ul>
68  * <li>{@link #get() get*(..)} operations from multiple threads.</li>
69  * <li>{@link #put(Object) put*(..)} operations from multiple threads.</li>
70  * <li>{@link #get() get*(..)} and {@link #put(Object) put*(..)} thread may be the same.</li>
71  * </ul>
72  * </p>
73  * <p>
74  * Following methods acquire the global multi-read _and_ -write mutex:
75  * <ul>
76  * <li>{@link #resetFull(Object[])}</li>
77  * <li>{@link #clear()}</li>
78  * <li>{@link #growEmptyBuffer(Object[])}</li>
79  * </ul>
80  * </p>
81  * <p>
82  * Characteristics:
83  * <ul>
84  * <li>Read position points to the last read element.</li>
85  * <li>Write position points to the last written element.</li>
86  * </ul>
87  * <table border="1">
88  * <tr><td>Empty</td><td>writePos == readPos</td><td>size == 0</td></tr>
89  * <tr><td>Full</td><td>writePos == readPos - 1</td><td>size == capacity</td></tr>
90  * </table>
91  * <pre>
92  * Empty [RW][][ ][ ][ ][ ][ ][ ] ; W==R
93  * Avail [ ][ ][R][.][.][.][.][W] ; W > R
94  * Avail [.][.][.][W][ ][ ][R][.] ; W < R - 1
95  * Full [.][.][.][.][.][W][R][.] ; W==R-1
96  * </pre>
97  * </p>
98  * See also:
99  * <pre>
100  * - Sequentially Consistent (SC) ordering or SC-DRF (data race free) <https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering>
101  * - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order>
102  * </pre>
103  * <p>
104  * We would like to pass `NullValue_type nullelem` as a non-type template parameter of type `NullValue_type`, a potential Class.
105  * However, this is only allowed in C++20 and we use C++17 for now.
106  * Hence we have to pass `NullValue_type nullelem` in the constructor.
107  * </p>
108  * @see jau::sc_atomic_critical
109  */
110 template <typename Value_type, typename NullValue_type, typename Size_type,
111  bool use_memcpy = std::is_trivially_copyable_v<Value_type>,
112  bool use_memset = std::is_integral_v<Value_type> && sizeof(Value_type)==1 &&
113  std::is_integral_v<NullValue_type> && sizeof(NullValue_type)==1
114  >
115 class ringbuffer {
116  public:
117  constexpr static const bool uses_memcpy = use_memcpy;
118  constexpr static const bool uses_memset = use_memset;
119 
120  // typedefs' for C++ named requirements: Container (ex iterator)
121 
122  typedef Value_type value_type;
123  typedef value_type* pointer;
124  typedef const value_type* const_pointer;
126  typedef const value_type& const_reference;
127  typedef Size_type size_type;
128  typedef typename std::make_signed<size_type>::type difference_type;
129 
130  private:
131  /** SC atomic integral scalar jau::nsize_t. Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) */
133 
134  /** Relaxed non-SC atomic integral scalar jau::nsize_t. Memory-Model (MM) only guarantees the atomic value, _no_ sequential consistency (SC) between acquire (read) and release (write). */
136 
137  /** synchronizes write-operations (put*), i.e. modifying the writePos. */
138  mutable std::mutex syncWrite, syncMultiWrite; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire and release
139  std::condition_variable cvWrite;
140 
141  /** synchronizes read-operations (get*), i.e. modifying the readPos. */
142  mutable std::mutex syncRead, syncMultiRead; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire and release
143  std::condition_variable cvRead;
144 
145  /* const */ NullValue_type nullelem; // not final due to assignment operation
146  /* const */ Size_type capacityPlusOne; // not final due to grow
147  /* const */ Value_type * array; // Synchronized due to MM's data-race-free SC (SC-DRF) between [atomic] acquire/release
148  sc_atomic_Size_type readPos; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write)
149  sc_atomic_Size_type writePos; // ditto
150 
151  // DBG_PRINT("");
152  Value_type * newArray(const Size_type count) noexcept {
153 #if 0
154  Value_type *r = new Value_type[count];
155  _DEBUG_DUMP("newArray ...");
156  _DEBUG_PRINT("newArray %" PRIu64 "\n", count);
157  return r;
158 #else
159  return new Value_type[count];
160 #endif
161  }
162  void freeArray(Value_type ** a) noexcept {
163  _DEBUG_DUMP("freeArray(def)");
164  _DEBUG_PRINT("freeArray %p\n", *a);
165  if( nullptr == *a ) {
166  ABORT("ringbuffer::freeArray with nullptr");
167  }
168  delete[] *a;
169  *a = nullptr;
170  }
171 
172  template<typename _DataType, typename _NullType>
173  static void* memset_wrap(_DataType *block, const _NullType& c, size_t n,
174  std::enable_if_t< std::is_integral_v<_DataType> && sizeof(_DataType)==1 &&
175  std::is_integral_v<_NullType> && sizeof(_NullType)==1, bool > = true )
176  {
177  return ::memset(block, c, n);
178  }
179  template<typename _DataType, typename _NullType>
180  static void* memset_wrap(_DataType *block, const _NullType& c, size_t n,
181  std::enable_if_t< !std::is_integral_v<_DataType> || sizeof(_DataType)!=1 ||
182  !std::is_integral_v<_NullType> || sizeof(_NullType)!=1, bool > = true )
183  {
184  ABORT("MEMSET shall not be used");
185  (void)block;
186  (void)c;
187  (void)n;
188  return nullptr;
189  }
190 
191  /**
192  * clear all elements, zero size
193  */
194  void clearImpl() noexcept {
195  const Size_type size = getSize();
196  if( 0 < size ) {
197  if( uses_memset ) {
198  memset_wrap(&array[0], nullelem, capacityPlusOne*sizeof(Value_type));
199  readPos = 0;
200  writePos = 0;
201  } else {
202  Size_type localReadPos = readPos;
203  for(Size_type i=0; i<size; i++) {
204  localReadPos = (localReadPos + 1) % capacityPlusOne;
205  array[localReadPos] = nullelem;
206  }
207  if( writePos != localReadPos ) {
208  // Avoid exception, abort!
209  ABORT("copy segment error: this %s, readPos %d/%d; writePos %d", toString().c_str(), readPos.load(), localReadPos, writePos.load());
210  }
211  readPos = localReadPos;
212  }
213  }
214  }
215 
216  void cloneFrom(const bool allocArrayAndCapacity, const ringbuffer & source) noexcept {
217  if( allocArrayAndCapacity ) {
218  capacityPlusOne = source.capacityPlusOne;
219  if( nullptr != array ) {
220  freeArray(&array);
221  }
222  array = newArray(capacityPlusOne);
223  } else if( capacityPlusOne != source.capacityPlusOne ) {
224  ABORT( ("capacityPlusOne not equal: this "+toString()+", source "+source.toString() ).c_str() );
225  }
226 
227  readPos = source.readPos.load();
228  writePos = source.writePos.load();
229 
230  if( use_memcpy ) {
231  memcpy(reinterpret_cast<void*>(&array[0]),
232  reinterpret_cast<void*>(const_cast<Value_type*>(&source.array[0])),
233  capacityPlusOne*sizeof(Value_type));
234  } else {
235  const Size_type size = getSize();
236  Size_type localWritePos = readPos;
237  for(Size_type i=0; i<size; i++) {
238  localWritePos = (localWritePos + 1) % capacityPlusOne;
239  array[localWritePos] = source.array[localWritePos];
240  }
241  if( writePos != localWritePos ) {
242  ABORT( ("copy segment error: this "+toString()+", localWritePos "+std::to_string(localWritePos)+"; source "+source.toString()).c_str() );
243  }
244  }
245  }
246 
247  void resetImpl(const Value_type * copyFrom, const Size_type copyFromCount) noexcept {
248  clearImpl();
249 
250  // fill with copyFrom elements
251  if( nullptr != copyFrom && 0 < copyFromCount ) {
252  if( copyFromCount > capacityPlusOne-1 ) {
253  // new blank resized array
254  if( nullptr != array ) {
255  freeArray(&array);
256  }
257  capacityPlusOne = copyFromCount + 1;
258  array = newArray(capacityPlusOne);
259  readPos = 0;
260  writePos = 0;
261  }
262  if( use_memcpy ) {
263  memcpy(reinterpret_cast<void*>(&array[0]),
264  reinterpret_cast<void*>(const_cast<Value_type*>(copyFrom)),
265  copyFromCount*sizeof(Value_type));
266  readPos = capacityPlusOne - 1; // last read-pos
267  writePos = copyFromCount - 1; // last write-pos
268  } else {
269  Size_type localWritePos = writePos;
270  for(Size_type i=0; i<copyFromCount; i++) {
271  localWritePos = (localWritePos + 1) % capacityPlusOne;
272  array[localWritePos] = copyFrom[i];
273  }
274  writePos = localWritePos;
275  }
276  }
277  }
278 
279  Value_type peekImpl(const bool blocking, const int timeoutMS, bool& success) noexcept {
280  if( !std::is_copy_constructible_v<Value_type> ) {
281  ABORT("Value_type is not copy constructible");
282  return nullelem;
283  }
284  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
285 
286  const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
287  Size_type localReadPos = oldReadPos;
288  if( localReadPos == writePos ) {
289  if( blocking ) {
290  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
291  while( localReadPos == writePos ) {
292  if( 0 == timeoutMS ) {
293  cvWrite.wait(lockWrite);
294  } else {
295  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
296  std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
297  if( std::cv_status::timeout == s && localReadPos == writePos ) {
298  success = false;
299  return nullelem;
300  }
301  }
302  }
303  } else {
304  success = false;
305  return nullelem;
306  }
307  }
308  localReadPos = (localReadPos + 1) % capacityPlusOne;
309  Value_type r = array[localReadPos]; // SC-DRF
310  readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek)
311  success = true;
312  return r;
313  }
314 
315  Value_type moveOutImpl(const bool blocking, const int timeoutMS, bool& success) noexcept {
316  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
317 
318  const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
319  Size_type localReadPos = oldReadPos;
320  if( localReadPos == writePos ) {
321  if( blocking ) {
322  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
323  while( localReadPos == writePos ) {
324  if( 0 == timeoutMS ) {
325  cvWrite.wait(lockWrite);
326  } else {
327  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
328  std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
329  if( std::cv_status::timeout == s && localReadPos == writePos ) {
330  success = false;
331  return nullelem;
332  }
333  }
334  }
335  } else {
336  success = false;
337  return nullelem;
338  }
339  }
340  localReadPos = (localReadPos + 1) % capacityPlusOne;
341  Value_type r = std::move( array[localReadPos] ); // SC-DRF
342  array[localReadPos] = nullelem;
343  {
344  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
345  readPos = localReadPos; // SC-DRF release atomic readPos
346  cvRead.notify_all(); // notify waiting putter
347  }
348  success = true;
349  return r;
350  }
351 
352  Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const int timeoutMS) noexcept {
353  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
354 
355  const Size_type min_count = std::min(dest_len, min_count_);
356  Value_type *iter_out = dest;
357 
358  if( min_count >= capacityPlusOne ) {
359  return 0;
360  }
361  if( 0 == min_count ) {
362  return 0;
363  }
364 
365  const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
366  Size_type localReadPos = oldReadPos;
367  Size_type available = getSize();
368  if( min_count > available ) {
369  if( blocking ) {
370  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
371  available = getSize();
372  while( min_count > available ) {
373  if( 0 == timeoutMS ) {
374  cvWrite.wait(lockWrite);
375  available = getSize();
376  } else {
377  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
378  std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
379  available = getSize();
380  if( std::cv_status::timeout == s && min_count > available ) {
381  return 0;
382  }
383  }
384  }
385  } else {
386  return 0;
387  }
388  }
389  const Size_type count = std::min(dest_len, available);
390 
391  /**
392  * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
393  * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
394  * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
395  * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
396  */
397  // Since available > 0, we can exclude Empty case.
398  Size_type togo_count = count;
399  const Size_type localWritePos = writePos;
400  if( localReadPos > localWritePos ) {
401  // we have a tail
402  localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos
403  const Size_type tail_count = std::min(togo_count, capacityPlusOne - localReadPos);
404  if( use_memcpy ) {
405  memcpy(reinterpret_cast<void*>(iter_out),
406  reinterpret_cast<void*>(&array[localReadPos]),
407  tail_count*sizeof(Value_type));
408  if( uses_memset ) {
409  memset_wrap(&array[localReadPos], nullelem, tail_count*sizeof(Value_type));
410  } else {
411  for(Size_type i=0; i<tail_count; i++) {
412  array[localReadPos+i] = nullelem;
413  }
414  }
415  } else {
416  for(Size_type i=0; i<tail_count; i++) {
417  iter_out[i] = std::move( array[localReadPos+i] ); // SC-DRF
418  array[localReadPos+i] = nullelem;
419  }
420  }
421  localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne; // last read-pos
422  togo_count -= tail_count;
423  iter_out += tail_count;
424  }
425  if( togo_count > 0 ) {
426  // we have a head
427  localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos
428  if( use_memcpy ) {
429  memcpy(reinterpret_cast<void*>(iter_out),
430  reinterpret_cast<void*>(&array[localReadPos]),
431  togo_count*sizeof(Value_type));
432  if( uses_memset ) {
433  memset_wrap(&array[localReadPos], nullelem, togo_count*sizeof(Value_type));
434  } else {
435  for(Size_type i=0; i<togo_count; i++) {
436  array[localReadPos+i] = nullelem;
437  }
438  }
439  } else {
440  for(Size_type i=0; i<togo_count; i++) {
441  iter_out[i] = array[localReadPos+i];
442  array[localReadPos+i] = nullelem;
443  }
444  }
445  localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne; // last read-pos
446  }
447  // Value_type r = std::move( array[localReadPos] ); // SC-DRF
448  {
449  std::unique_lock<std::mutex> locRead(syncRead); // SC-DRF w/ putImpl via same lock
450  readPos = localReadPos; // SC-DRF release atomic readPos
451  cvRead.notify_all(); // notify waiting putter
452  }
453  return count;
454  }
455 
456  bool dropImpl (const Size_type count, const bool blocking, const int timeoutMS) noexcept {
457  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
458 
459  if( count >= capacityPlusOne ) {
460  return false;
461  }
462  if( 0 == count ) {
463  return true;
464  }
465 
466  const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
467  Size_type localReadPos = oldReadPos;
468  Size_type available = getSize();
469  if( count > available ) {
470  if( blocking ) {
471  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
472  available = getSize();
473  while( count > available ) {
474  if( 0 == timeoutMS ) {
475  cvWrite.wait(lockWrite);
476  available = getSize();
477  } else {
478  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
479  std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
480  available = getSize();
481  if( std::cv_status::timeout == s && count > available ) {
482  return false;
483  }
484  }
485  }
486  } else {
487  return false;
488  }
489  }
490  /**
491  * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
492  * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
493  * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
494  * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
495  */
496  // Since available > 0, we can exclude Empty case.
497  Size_type togo_count = count;
498  const Size_type localWritePos = writePos;
499  if( localReadPos > localWritePos ) {
500  // we have a tail
501  localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos
502  const Size_type tail_count = std::min(togo_count, capacityPlusOne - localReadPos);
503  if( uses_memset ) {
504  memset_wrap(&array[localReadPos], nullelem, tail_count*sizeof(Value_type));
505  } else {
506  for(Size_type i=0; i<tail_count; i++) {
507  array[localReadPos+i] = nullelem;
508  }
509  }
510  localReadPos = ( localReadPos + tail_count - 1 ) % capacityPlusOne; // last read-pos
511  togo_count -= tail_count;
512  }
513  if( togo_count > 0 ) {
514  // we have a head
515  localReadPos = ( localReadPos + 1 ) % capacityPlusOne; // next-read-pos
516  if( uses_memset ) {
517  memset_wrap(&array[localReadPos], nullelem, togo_count*sizeof(Value_type));
518  } else {
519  for(Size_type i=0; i<togo_count; i++) {
520  array[localReadPos+i] = nullelem;
521  }
522  }
523  localReadPos = ( localReadPos + togo_count - 1 ) % capacityPlusOne; // last read-pos
524  }
525  // Value_type r = std::move( array[localReadPos] ); // SC-DRF
526  {
527  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
528  readPos = localReadPos; // SC-DRF release atomic readPos
529  cvRead.notify_all(); // notify waiting putter
530  }
531  return true;
532  }
533 
534  bool moveIntoImpl(Value_type &&e, const bool blocking, const int timeoutMS) noexcept {
535  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
536 
537  Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
538  localWritePos = (localWritePos + 1) % capacityPlusOne;
539  if( localWritePos == readPos ) {
540  if( blocking ) {
541  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
542  while( localWritePos == readPos ) {
543  if( 0 == timeoutMS ) {
544  cvRead.wait(lockRead);
545  } else {
546  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
547  std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
548  if( std::cv_status::timeout == s && localWritePos == readPos ) {
549  return false;
550  }
551  }
552  }
553  } else {
554  return false;
555  }
556  }
557  array[localWritePos] = std::move(e); // SC-DRF
558  {
559  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
560  writePos = localWritePos; // SC-DRF release atomic writePos
561  cvWrite.notify_all(); // notify waiting getter
562  }
563  return true;
564  }
565 
566  bool copyIntoImpl(const Value_type &e, const bool blocking, const int timeoutMS) noexcept {
567  if( !std::is_copy_constructible_v<Value_type> ) {
568  ABORT("Value_type is not copy constructible");
569  return false;
570  }
571  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
572 
573  Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
574  localWritePos = (localWritePos + 1) % capacityPlusOne;
575  if( localWritePos == readPos ) {
576  if( blocking ) {
577  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
578  while( localWritePos == readPos ) {
579  if( 0 == timeoutMS ) {
580  cvRead.wait(lockRead);
581  } else {
582  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
583  std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
584  if( std::cv_status::timeout == s && localWritePos == readPos ) {
585  return false;
586  }
587  }
588  }
589  } else {
590  return false;
591  }
592  }
593  array[localWritePos] = e; // SC-DRF
594  {
595  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
596  writePos = localWritePos; // SC-DRF release atomic writePos
597  cvWrite.notify_all(); // notify waiting getter
598  }
599  return true;
600  }
601 
602  bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const int timeoutMS) noexcept {
603  if( !std::is_copy_constructible_v<Value_type> ) {
604  ABORT("Value_type is not copy constructible");
605  return false;
606  }
607  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
608 
609  const Value_type *iter_in = first;
610  const Size_type total_count = last - first;
611 
612  if( total_count >= capacityPlusOne ) {
613  return false;
614  }
615  if( 0 == total_count ) {
616  return true;
617  }
618 
619  Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
620  Size_type available = getFreeSlots();
621  if( total_count > available ) {
622  if( blocking ) {
623  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
624  available = getFreeSlots();
625  while( total_count > available ) {
626  if( 0 == timeoutMS ) {
627  cvRead.wait(lockRead);
628  available = getFreeSlots();
629  } else {
630  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
631  std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
632  available = getFreeSlots();
633  if( std::cv_status::timeout == s && total_count > available ) {
634  return false;
635  }
636  }
637  }
638  } else {
639  return false;
640  }
641  }
642  /**
643  * Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
644  * Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
645  * Avail [.][.][.][W][ ][ ][R][.][.][.][.][.][.][.][.] ; W < R - 1
646  * Full [.][.][.][.][.][W][R][.][.][.][.][.][.][.][.] ; W==R-1
647  */
648  // Since available > 0, we can exclude Full case.
649  Size_type togo_count = total_count;
650  const Size_type localReadPos = readPos;
651  if( localWritePos >= localReadPos ) { // Empty at any position or W > R case
652  // we have a tail
653  localWritePos = ( localWritePos + 1 ) % capacityPlusOne; // next-write-pos
654  const Size_type tail_count = std::min(togo_count, capacityPlusOne - localWritePos);
655  if( use_memcpy ) {
656  memcpy(reinterpret_cast<void*>(&array[localWritePos]),
657  reinterpret_cast<void*>(const_cast<Value_type*>(iter_in)),
658  tail_count*sizeof(Value_type));
659  } else {
660  for(Size_type i=0; i<tail_count; i++) {
661  array[localWritePos+i] = iter_in[i];
662  }
663  }
664  localWritePos = ( localWritePos + tail_count - 1 ) % capacityPlusOne; // last write-pos
665  togo_count -= tail_count;
666  iter_in += tail_count;
667  }
668  if( togo_count > 0 ) {
669  // we have a head
670  localWritePos = ( localWritePos + 1 ) % capacityPlusOne; // next-write-pos
671  if( use_memcpy ) {
672  memcpy(reinterpret_cast<void*>(&array[localWritePos]),
673  reinterpret_cast<void*>(const_cast<Value_type*>(iter_in)),
674  togo_count*sizeof(Value_type));
675  } else {
676  for(Size_type i=0; i<togo_count; i++) {
677  array[localWritePos+i] = iter_in[i];
678  }
679  }
680  localWritePos = ( localWritePos + togo_count - 1 ) % capacityPlusOne; // last write-pos
681  }
682  // array[localWritePos] = e; // SC-DRF
683  {
684  std::unique_lock<std::mutex> lockRead(syncWrite); // SC-DRF w/ getImpl via same lock
685  writePos = localWritePos; // SC-DRF release atomic writePos
686  cvWrite.notify_all(); // notify waiting getter
687  }
688  return true;
689  }
690 
691  public:
692 
693  /**
694  * Blocks until at least <code>count</code> elements have been put
695  * for subsequent get() and getBlocking().
696  *
697  * @param min_count minimum number of put slots
698  * @param timeoutMS
699  * @return the number of put elements, available for get() and getBlocking()
700  */
701  Size_type waitForElements(const Size_type min_count, const int timeoutMS) noexcept {
702  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
703 
704  Size_type available = getSize();
705  if( min_count > available ) {
706  std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
707  available = getSize();
708  while( min_count > available ) {
709  if( 0 == timeoutMS ) {
710  cvWrite.wait(lockWrite);
711  available = getSize();
712  } else {
713  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
714  std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
715  available = getSize();
716  if( std::cv_status::timeout == s && min_count > available ) {
717  return available;
718  }
719  }
720  }
721  }
722  return available;
723  }
724 
725  /**
726  * Blocks until at least <code>count</code> free slots become available
727  * for subsequent put() and putBlocking().
728  *
729  * @param min_count minimum number of free slots
730  * @param timeoutMS
731  * @return the number of free slots, available for put() and putBlocking()
732  */
733  Size_type waitForFreeSlots(const Size_type min_count, const int timeoutMS) noexcept {
734  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
735 
736  Size_type available = getFreeSlots();
737  if( min_count > available ) {
738  std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
739  available = getFreeSlots();
740  while( min_count > available ) {
741  if( 0 == timeoutMS ) {
742  cvRead.wait(lockRead);
743  available = getFreeSlots();
744  } else {
745  std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
746  std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
747  available = getFreeSlots();
748  if( std::cv_status::timeout == s && min_count > available ) {
749  return available;
750  }
751  }
752  }
753  }
754  return available;
755  }
756 
757  /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */
758  std::string toString() const noexcept {
759  const std::string es = isEmpty() ? ", empty" : "";
760  const std::string fs = isFull() ? ", full" : "";
761  return "ringbuffer<?>[size "+std::to_string(getSize())+" / "+std::to_string(capacityPlusOne-1)+
762  ", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]";
763  }
764 
765  /** Debug functionality - Dumps the contents of the internal array. */
766  void dump(FILE *stream, std::string prefix) const noexcept {
767 #if 0
768  fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str());
769  for(Size_type i=0; i<capacityPlusOne; i++) {
770  // fprintf(stream, "\t[%d]: %p\n", i, array[i].get()); // FIXME
771  }
772  fprintf(stream, "}\n");
773 #else
774  fprintf(stream, "%s %s, array %p\n", prefix.c_str(), toString().c_str(), array);
775 #endif
776  }
777 
778  /**
779  * Create a full ring buffer instance w/ the given array's net capacity and content.
780  * <p>
781  * Example for a 10 element Integer array:
782  * <pre>
783  * Integer[] source = new Integer[10];
784  * // fill source with content ..
785  * ringbuffer<Integer> rb = new ringbuffer<Integer>(source);
786  * </pre>
787  * </p>
788  * <p>
789  * {@link #isFull()} returns true on the newly created full ring buffer.
790  * </p>
791  * <p>
792  * Implementation will allocate an internal array with size of array <code>copyFrom</code> <i>plus one</i>,
793  * and copy all elements from array <code>copyFrom</code> into the internal array.
794  * </p>
795  * @param nullelem The `null` value used to zero removed elements on get*(..) and clear()
796  * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content.
797  * @throws IllegalArgumentException if <code>copyFrom</code> is <code>nullptr</code>
798  */
799  ringbuffer(const NullValue_type& nullelem_, const std::vector<Value_type> & copyFrom) noexcept
800  : nullelem(nullelem_), capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)),
801  readPos(0), writePos(0)
802  {
803  resetImpl(copyFrom.data(), copyFrom.size());
804  _DEBUG_DUMP("ctor(vector<Value_type>)");
805  }
806 
807  /**
808  * @param nullelem The `null` value used to zero removed elements on get*(..) and clear()
809  * @param copyFrom
810  * @param copyFromSize
811  */
812  ringbuffer(const NullValue_type& nullelem_, const Value_type * copyFrom, const Size_type copyFromSize) noexcept
813  : nullelem(nullelem_), capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)),
814  readPos(0), writePos(0)
815  {
816  resetImpl(copyFrom, copyFromSize);
817  _DEBUG_DUMP("ctor(Value_type*, len)");
818  }
819 
820  /**
821  * Create an empty ring buffer instance w/ the given net <code>capacity</code>.
822  * <p>
823  * Example for a 10 element Integer array:
824  * <pre>
825  * ringbuffer<Integer> rb = new ringbuffer<Integer>(10, Integer[].class);
826  * </pre>
827  * </p>
828  * <p>
829  * {@link #isEmpty()} returns true on the newly created empty ring buffer.
830  * </p>
831  * <p>
832  * Implementation will allocate an internal array of size <code>capacity</code> <i>plus one</i>.
833  * </p>
834  * @param nullelem The `null` value used to zero removed elements on get*(..) and clear()
835  * @param arrayType the array type of the created empty internal array.
836  * @param capacity the initial net capacity of the ring buffer
837  */
838  ringbuffer(const NullValue_type& nullelem_, const Size_type capacity) noexcept
839  : nullelem(nullelem_), capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)),
840  readPos(0), writePos(0)
841  {
842  _DEBUG_DUMP("ctor(capacity)");
843  }
844 
845  ~ringbuffer() noexcept {
846  _DEBUG_DUMP("dtor(def)");
847  if( nullptr != array ) {
848  freeArray(&array);
849  }
850  }
851 
852  ringbuffer(const ringbuffer &_source) noexcept
853  : nullelem(_source.nullelem), capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)),
854  readPos(0), writePos(0)
855  {
856  std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
857  std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
858  std::lock(lockMultiReadS, lockMultiWriteS); // *this instance does not exist yet
859  cloneFrom(false, _source);
860  _DEBUG_DUMP("ctor(copy.this)");
861  _DEBUG_DUMP2(_source, "ctor(copy.source)");
862  }
863 
864  ringbuffer& operator=(const ringbuffer &_source) noexcept {
865  std::unique_lock<std::mutex> lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
866  std::unique_lock<std::mutex> lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
867  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance!
868  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock);
869  std::lock(lockMultiReadS, lockMultiWriteS, lockMultiRead, lockMultiWrite);
870 
871  if( this == &_source ) {
872  return *this;
873  }
874  nullelem = _source.nullelem;
875 
876  if( capacityPlusOne != _source.capacityPlusOne ) {
877  cloneFrom(true, _source);
878  } else {
879  clearImpl(); // clear
880  cloneFrom(false, _source);
881  }
882  _DEBUG_DUMP("assignment(copy.this)");
883  _DEBUG_DUMP2(_source, "assignment(copy.source)");
884  return *this;
885  }
886 
887  ringbuffer(ringbuffer &&o) noexcept = default;
888  ringbuffer& operator=(ringbuffer &&o) noexcept = default;
889 
890  /** Returns the net capacity of this ring buffer. */
891  Size_type capacity() const noexcept { return capacityPlusOne-1; }
892 
893  /**
894  * Releasing all elements by assigning <code>nullelem</code>.
895  * <p>
896  * {@link #isEmpty()} will return <code>true</code> and
897  * {@link #getSize()} will return <code>0</code> after calling this method.
898  * </p>
899  */
900  void clear() noexcept {
901  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
902  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
903  std::lock(lockMultiRead, lockMultiWrite);
904  clearImpl();
905  }
906 
907  /**
908  * {@link #clear()} all elements and add all <code>copyFrom</code> elements thereafter.
909  * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array.
910  */
911  void reset(const Value_type * copyFrom, const Size_type copyFromCount) noexcept {
912  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
913  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
914  std::lock(lockMultiRead, lockMultiWrite);
915  resetImpl(copyFrom, copyFromCount);
916  }
917 
918  void reset(const std::vector<Value_type> & copyFrom) noexcept {
919  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
920  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
921  std::lock(lockMultiRead, lockMultiWrite);
922  resetImpl(copyFrom.data(), copyFrom.size());
923  }
924 
925  /** Returns the number of elements in this ring buffer. */
926  Size_type getSize() const noexcept {
927  const Size_type R = readPos;
928  const Size_type W = writePos;
929  // W >= R: W - R
930  // W < R: C+1 - R - 1 + W + 1 = C+1 - R + W
931  return W >= R ? W - R : capacityPlusOne - R + W;
932  }
933 
934  /** Returns the number of free slots available to put. */
935  Size_type getFreeSlots() const noexcept { return capacityPlusOne - 1 - getSize(); }
936 
937  /** Returns true if this ring buffer is empty, otherwise false. */
938  bool isEmpty() const noexcept { return writePos == readPos; /* 0 == size */ }
939 
940  /** Returns true if this ring buffer is full, otherwise false. */
941  bool isFull() const noexcept { return ( writePos + 1 ) % capacityPlusOne == readPos; /* W == R - 1 */; }
942 
943  /**
944  * Peeks the next element at the read position w/o modifying pointer, nor blocking.
945  * @return <code>nullelem</code> if empty, otherwise the element which would be read next.
946  */
947  Value_type peek() noexcept {
948  bool success;
949  return peekImpl(false, 0, success);
950  }
951 
952  /**
953  * Peeks the next element at the read position w/o modifying pointer, nor blocking.
954  * @param result storage for the resulting value if successful, otherwise <code>nullelem</code> if empty.
955  * @return true if successful, otherwise false.
956  */
957  bool peek(Value_type& result) noexcept {
958  bool success;
959  result = peekImpl(false, 0, success);
960  return success;
961  }
962 
963  /**
964  * Peeks the next element at the read position w/o modifying pointer, but with blocking.
965  * <p>
966  * <code>timeoutMS</code> defaults to zero,
967  * i.e. infinitive blocking until an element available via put.<br>
968  * Otherwise this methods blocks for the given milliseconds.
969  * </p>
970  * @return <code>nullelem</code> if empty or timeout occurred, otherwise the element which would be read next.
971  */
972  Value_type peekBlocking(const int timeoutMS=0) noexcept {
973  bool success;
974  return peekImpl(true, timeoutMS, success);
975  }
976 
977  /**
978  * Peeks the next element at the read position w/o modifying pointer, but with blocking.
979  * <p>
980  * <code>timeoutMS</code> defaults to zero,
981  * i.e. infinitive blocking until an element available via put.<br>
982  * Otherwise this methods blocks for the given milliseconds.
983  * </p>
984  * @param result storage for the resulting value if successful, otherwise <code>nullelem</code> if empty.
985  * @return true if successful, otherwise false.
986  */
987  bool peekBlocking(Value_type& result, const int timeoutMS=0) noexcept {
988  bool success;
989  result = peekImpl(true, timeoutMS, success);
990  return success;
991  }
992 
993  /**
994  * Dequeues the oldest enqueued element if available, otherwise null.
995  * <p>
996  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
997  * and move ownership to the caller.
998  * </p>
999  * <p>
1000  * Method is non blocking and returns immediately;.
1001  * </p>
1002  * @return the oldest put element if available, otherwise <code>nullelem</code>.
1003  */
1004  Value_type get() noexcept {
1005  bool success;
1006  return moveOutImpl(false, 0, success);
1007  }
1008 
1009  /**
1010  * Dequeues the oldest enqueued element if available, otherwise null.
1011  * <p>
1012  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
1013  * and move ownership to the caller.
1014  * </p>
1015  * <p>
1016  * Method is non blocking and returns immediately;.
1017  * </p>
1018  * @param result storage for the resulting value if successful, otherwise <code>nullelem</code> if empty.
1019  * @return true if successful, otherwise false.
1020  */
1021  bool get(Value_type& result) noexcept {
1022  bool success;
1023  result = moveOutImpl(false, 0, success);
1024  return success;
1025  }
1026 
1027  /**
1028  * Dequeues the oldest enqueued element.
1029  * <p>
1030  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
1031  * and move ownership to the caller.
1032  * </p>
1033  * <p>
1034  * <code>timeoutMS</code> defaults to zero,
1035  * i.e. infinitive blocking until an element available via put.<br>
1036  * Otherwise this methods blocks for the given milliseconds.
1037  * </p>
1038  * @return the oldest put element or <code>nullelem</code> if timeout occurred.
1039  */
1040  Value_type getBlocking(const int timeoutMS=0) noexcept {
1041  bool success;
1042  return moveOutImpl(true, timeoutMS, success);
1043  }
1044 
1045  /**
1046  * Dequeues the oldest enqueued element.
1047  * <p>
1048  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
1049  * and move ownership to the caller.
1050  * </p>
1051  * <p>
1052  * <code>timeoutMS</code> defaults to zero,
1053  * i.e. infinitive blocking until an element available via put.<br>
1054  * Otherwise this methods blocks for the given milliseconds.
1055  * </p>
1056  * @param result storage for the resulting value if successful, otherwise <code>nullelem</code> if empty.
1057  * @return true if successful, otherwise false.
1058  */
1059  bool getBlocking(Value_type& result, const int timeoutMS=0) noexcept {
1060  bool success;
1061  result = moveOutImpl(true, timeoutMS, success);
1062  return success;
1063  }
1064 
1065  /**
1066  * Dequeues the oldest enqueued `min(dest_len, getSize()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
1067  * <p>
1068  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
1069  * and move ownership to the caller.
1070  * </p>
1071  * <p>
1072  * Method is non blocking and returns immediately;.
1073  * </p>
1074  * @param dest pointer to first storage element of `count` consecutive elements.
1075  * @param dest_len number of consecutive elements in dest and maximum number of elements to get
1076  * @param min_count minimum number of consecutive elements to get
1077  * @return actual number of elements received
1078  */
1079  Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept {
1080  return moveOutImpl(dest, dest_len, min_count, false, 0);
1081  }
1082 
1083  /**
1084  * Dequeues the oldest enqueued `min(dest_len, getSize()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
1085  * <p>
1086  * The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
1087  * and move ownership to the caller.
1088  * </p>
1089  * <p>
1090  * <code>timeoutMS</code> defaults to zero,
1091  * i.e. infinitive blocking until an element available via put.<br>
1092  * Otherwise this methods blocks for the given milliseconds.
1093  * </p>
1094  * @param dest pointer to first storage element of `count` consecutive elements.
1095  * @param dest_len number of consecutive elements in dest and maximum number of elements to get
1096  * @param min_count minimum number of consecutive elements to get
1097  * @param timeoutMS
1098  * @return actual number of elements received
1099  */
1100  Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept {
1101  return moveOutImpl(dest, dest_len, min_count, true, timeoutMS);
1102  }
1103 
1104  /**
1105  * Drops {@code count} oldest enqueued elements.
1106  * <p>
1107  * Method is non blocking and returns immediately;.
1108  * </p>
1109  * @param count number of elements to drop from ringbuffer.
1110  * @return true if successful, otherwise false
1111  */
1112  bool drop(const Size_type count) noexcept {
1113  return dropImpl(count, false, 0);
1114  }
1115 
1116  /**
1117  * Drops {@code count} oldest enqueued elements.
1118  * <p>
1119  * <code>timeoutMS</code> defaults to zero,
1120  * i.e. infinitive blocking until an element available via put.<br>
1121  * Otherwise this methods blocks for the given milliseconds.
1122  * </p>
1123  * @param count number of elements to drop from ringbuffer.
1124  * @return true if successful, otherwise false
1125  */
1126  bool dropBlocking(const Size_type count, const int timeoutMS=0) noexcept {
1127  return dropImpl(count, true, timeoutMS);
1128  }
1129 
1130  /**
1131  * Enqueues the given element by moving it into this ringbuffer storage.
1132  * <p>
1133  * Returns true if successful, otherwise false in case buffer is full.
1134  * </p>
1135  * <p>
1136  * Method is non blocking and returns immediately;.
1137  * </p>
1138  * @return true if successful, otherwise false
1139  */
1140  bool put(Value_type && e) noexcept {
1141  return moveIntoImpl(std::move(e), false, 0);
1142  }
1143 
1144  /**
1145  * Enqueues the given element by moving it into this ringbuffer storage.
1146  * <p>
1147  * <code>timeoutMS</code> defaults to zero,
1148  * i.e. infinitive blocking until a free slot becomes available via get.<br>
1149  * Otherwise this methods blocks for the given milliseconds.
1150  * </p>
1151  * @return true if successful, otherwise false in case timeout occurred or otherwise.
1152  */
1153  bool putBlocking(Value_type && e, const int timeoutMS=0) noexcept {
1154  return moveIntoImpl(std::move(e), true, timeoutMS);
1155  }
1156 
1157  /**
1158  * Enqueues the given element by copying it into this ringbuffer storage.
1159  * <p>
1160  * Returns true if successful, otherwise false in case buffer is full.
1161  * </p>
1162  * <p>
1163  * Method is non blocking and returns immediately;.
1164  * </p>
1165  * @return true if successful, otherwise false
1166  */
1167  bool put(const Value_type & e) noexcept {
1168  return copyIntoImpl(e, false, 0);
1169  }
1170 
1171  /**
1172  * Enqueues the given element by copying it into this ringbuffer storage.
1173  * <p>
1174  * <code>timeoutMS</code> defaults to zero,
1175  * i.e. infinitive blocking until a free slot becomes available via get.<br>
1176  * Otherwise this methods blocks for the given milliseconds.
1177  * </p>
1178  * @return true if successful, otherwise false in case timeout occurred or otherwise.
1179  */
1180  bool putBlocking(const Value_type & e, const int timeoutMS=0) noexcept {
1181  return copyIntoImpl(e, true, timeoutMS);
1182  }
1183 
1184  /**
1185  * Enqueues the given range of consecutive elements by copying it into this ringbuffer storage.
1186  * <p>
1187  * Returns true if successful, otherwise false in case buffer is full.
1188  * </p>
1189  * <p>
1190  * Method is non blocking and returns immediately;.
1191  * </p>
1192  * @param first pointer to first consecutive element to range of value_type [first, last)
1193  * @param last pointer to last consecutive element to range of value_type [first, last)
1194  * @return true if successful, otherwise false
1195  */
1196  bool put(const Value_type *first, const Value_type* last) noexcept {
1197  return copyIntoImpl(first, last, false, 0);
1198  }
1199 
1200  /**
1201  * Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
1202  * <p>
1203  * <code>timeoutMS</code> defaults to zero,
1204  * i.e. infinitive blocking until a free slot becomes available via get.<br>
1205  * Otherwise this methods blocks for the given milliseconds.
1206  * </p>
1207  * @param first pointer to first consecutive element to range of value_type [first, last)
1208  * @param last pointer to last consecutive element to range of value_type [first, last)
1209  * @param timeoutMS
1210  * @return true if successful, otherwise false in case timeout occurred or otherwise.
1211  */
1212  bool putBlocking(const Value_type *first, const Value_type* last, const int timeoutMS=0) noexcept {
1213  return copyIntoImpl(first, last, true, timeoutMS);
1214  }
1215 
1216  /**
1217  * Resizes this ring buffer's capacity.
1218  * <p>
1219  * New capacity must be greater than current size.
1220  * </p>
1221  */
1222  void recapacity(const Size_type newCapacity) {
1223  std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
1224  std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
1225  std::lock(lockMultiRead, lockMultiWrite);
1226  const Size_type size = getSize();
1227 
1228  if( capacityPlusOne == newCapacity+1 ) {
1229  return;
1230  }
1231  if( size > newCapacity ) {
1232  throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE);
1233  }
1234 
1235  // save current data
1236  Size_type oldCapacityPlusOne = capacityPlusOne;
1237  Value_type * oldArray = array;
1238  Size_type oldReadPos = readPos;
1239 
1240  // new blank resized array
1241  capacityPlusOne = newCapacity + 1;
1242  array = newArray(capacityPlusOne);
1243  readPos = 0;
1244  writePos = 0;
1245 
1246  // copy saved data
1247  if( nullptr != oldArray && 0 < size ) {
1248  Size_type localWritePos = writePos;
1249  for(Size_type i=0; i<size; i++) {
1250  localWritePos = (localWritePos + 1) % capacityPlusOne;
1251  oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne;
1252  array[localWritePos] = std::move( oldArray[oldReadPos] );
1253  }
1254  writePos = localWritePos;
1255  }
1256  freeArray(&oldArray); // and release
1257  }
1258 };
1259 
1260 } /* namespace jau */
1261 
1262 /** \example test_lfringbuffer01.cpp
1263  * This C++ unit test validates jau::ringbuffer w/o parallel processing.
1264  * <p>
1265  * With test_lfringbuffer11.cpp, this work verifies jau::ringbuffer correctness
1266  * </p>
1267  */
1268 
1269 /** \example test_lfringbuffer11.cpp
1270  * This C++ unit test validates jau::ringbuffer with parallel processing.
1271  * <p>
1272  * With test_lfringbuffer01.cpp, this work verifies jau::ringbuffer correctness
1273  * </p>
1274  */
1275 
1276 #endif /* JAU_RINGBUFFER_HPP_ */
jau::ringbuffer::getFreeSlots
Size_type getFreeSlots() const noexcept
Returns the number of free slots available to put.
Definition: ringbuffer.hpp:935
jau::ringbuffer::put
bool put(Value_type &&e) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
Definition: ringbuffer.hpp:1140
jau::ringbuffer::reset
void reset(const Value_type *copyFrom, const Size_type copyFromCount) noexcept
clear() all elements and add all copyFrom elements thereafter.
Definition: ringbuffer.hpp:911
jau::ringbuffer::put
bool put(const Value_type &e) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
Definition: ringbuffer.hpp:1167
jau::ringbuffer::waitForFreeSlots
Size_type waitForFreeSlots(const Size_type min_count, const int timeoutMS) noexcept
Blocks until at least count free slots become available for subsequent put() and putBlocking().
Definition: ringbuffer.hpp:733
jau::ringbuffer::getBlocking
bool getBlocking(Value_type &result, const int timeoutMS=0) noexcept
Dequeues the oldest enqueued element.
Definition: ringbuffer.hpp:1059
jau::ringbuffer::difference_type
std::make_signed< size_type >::type difference_type
Definition: ringbuffer.hpp:128
jau::ringbuffer::pointer
value_type * pointer
Definition: ringbuffer.hpp:123
jau::IllegalArgumentException
Definition: basic_types.hpp:111
jau::ringbuffer::peek
Value_type peek() noexcept
Peeks the next element at the read position w/o modifying pointer, nor blocking.
Definition: ringbuffer.hpp:947
jau::ringbuffer::put
bool put(const Value_type *first, const Value_type *last) noexcept
Enqueues the given range of consecutive elements by copying it into this ringbuffer storage.
Definition: ringbuffer.hpp:1196
jau::ringbuffer::isEmpty
bool isEmpty() const noexcept
Returns true if this ring buffer is empty, otherwise false.
Definition: ringbuffer.hpp:938
jau::ringbuffer::dump
void dump(FILE *stream, std::string prefix) const noexcept
Debug functionality - Dumps the contents of the internal array.
Definition: ringbuffer.hpp:766
jau::ringbuffer::ringbuffer
ringbuffer(const NullValue_type &nullelem_, const std::vector< Value_type > &copyFrom) noexcept
Create a full ring buffer instance w/ the given array's net capacity and content.
Definition: ringbuffer.hpp:799
jau::ringbuffer::~ringbuffer
~ringbuffer() noexcept
Definition: ringbuffer.hpp:845
_DEBUG_PRINT
#define _DEBUG_PRINT(...)
Definition: ringbuffer.hpp:54
jau::ringbuffer::peekBlocking
bool peekBlocking(Value_type &result, const int timeoutMS=0) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
Definition: ringbuffer.hpp:987
jau::ringbuffer::clear
void clear() noexcept
Releasing all elements by assigning nullelem.
Definition: ringbuffer.hpp:900
jau
Definition: basic_algos.hpp:34
_DEBUG_DUMP
#define _DEBUG_DUMP(...)
Definition: ringbuffer.hpp:52
jau::ringbuffer::isFull
bool isFull() const noexcept
Returns true if this ring buffer is full, otherwise false.
Definition: ringbuffer.hpp:941
jau::to_string
PRAGMA_DISABLE_WARNING_POP constexpr_cxx20 std::string to_string(const endian &v) noexcept
Return std::string representation of the given jau::endian.
Definition: byte_util.hpp:198
ordered_atomic.hpp
E_FILE_LINE
#define E_FILE_LINE
Definition: basic_types.hpp:64
jau::ordered_atomic< Size_type, std::memory_order::memory_order_seq_cst >
ABORT
#define ABORT(...)
Use for unconditional ::abort() call with given messages, prefix '[elapsed_time] ABORT @ FILE:LINE: '...
Definition: debug.hpp:124
_DEBUG_DUMP2
#define _DEBUG_DUMP2(a,...)
Definition: ringbuffer.hpp:53
jau::ringbuffer::putBlocking
bool putBlocking(Value_type &&e, const int timeoutMS=0) noexcept
Enqueues the given element by moving it into this ringbuffer storage.
Definition: ringbuffer.hpp:1153
jau::ringbuffer::getBlocking
Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept
Dequeues the oldest enqueued min(dest_len, getSize()>=min_count) elements by copying them into the gi...
Definition: ringbuffer.hpp:1100
jau::ringbuffer::const_reference
const value_type & const_reference
Definition: ringbuffer.hpp:126
jau::ringbuffer::uses_memcpy
constexpr static const bool uses_memcpy
Definition: ringbuffer.hpp:117
jau::ringbuffer::size_type
Size_type size_type
Definition: ringbuffer.hpp:127
jau::ringbuffer::waitForElements
Size_type waitForElements(const Size_type min_count, const int timeoutMS) noexcept
Blocks until at least count elements have been put for subsequent get() and getBlocking().
Definition: ringbuffer.hpp:701
jau::ringbuffer::operator=
ringbuffer & operator=(ringbuffer &&o) noexcept=default
jau::ordered_atomic::load
CXX_ALWAYS_INLINE _Tp load() const noexcept
Definition: ordered_atomic.hpp:139
jau::ringbuffer::ringbuffer
ringbuffer(const ringbuffer &_source) noexcept
Definition: ringbuffer.hpp:852
jau::ringbuffer::putBlocking
bool putBlocking(const Value_type &e, const int timeoutMS=0) noexcept
Enqueues the given element by copying it into this ringbuffer storage.
Definition: ringbuffer.hpp:1180
jau::ringbuffer
Ring buffer implementation, a.k.a circular buffer, exposing lock-free get*(..) and put*(....
Definition: ringbuffer.hpp:115
jau::ringbuffer::uses_memset
constexpr static const bool uses_memset
Definition: ringbuffer.hpp:118
debug.hpp
jau::ringbuffer::ringbuffer
ringbuffer(const NullValue_type &nullelem_, const Value_type *copyFrom, const Size_type copyFromSize) noexcept
Definition: ringbuffer.hpp:812
jau::ringbuffer::dropBlocking
bool dropBlocking(const Size_type count, const int timeoutMS=0) noexcept
Drops.
Definition: ringbuffer.hpp:1126
jau::ringbuffer::operator=
ringbuffer & operator=(const ringbuffer &_source) noexcept
Definition: ringbuffer.hpp:864
jau::ringbuffer::reset
void reset(const std::vector< Value_type > &copyFrom) noexcept
Definition: ringbuffer.hpp:918
jau::ringbuffer::value_type
Value_type value_type
Definition: ringbuffer.hpp:122
jau::ringbuffer::toString
std::string toString() const noexcept
Returns a short string representation incl.
Definition: ringbuffer.hpp:758
jau::ringbuffer::capacity
Size_type capacity() const noexcept
Returns the net capacity of this ring buffer.
Definition: ringbuffer.hpp:891
jau::ringbuffer::get
Value_type get() noexcept
Dequeues the oldest enqueued element if available, otherwise null.
Definition: ringbuffer.hpp:1004
jau::ringbuffer::getSize
Size_type getSize() const noexcept
Returns the number of elements in this ring buffer.
Definition: ringbuffer.hpp:926
jau::ringbuffer::putBlocking
bool putBlocking(const Value_type *first, const Value_type *last, const int timeoutMS=0) noexcept
Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
Definition: ringbuffer.hpp:1212
jau::ringbuffer::peek
bool peek(Value_type &result) noexcept
Peeks the next element at the read position w/o modifying pointer, nor blocking.
Definition: ringbuffer.hpp:957
jau::ringbuffer::getBlocking
Value_type getBlocking(const int timeoutMS=0) noexcept
Dequeues the oldest enqueued element.
Definition: ringbuffer.hpp:1040
jau::ringbuffer::reference
value_type & reference
Definition: ringbuffer.hpp:125
basic_types.hpp
jau::ringbuffer::ringbuffer
ringbuffer(const NullValue_type &nullelem_, const Size_type capacity) noexcept
Create an empty ring buffer instance w/ the given net capacity.
Definition: ringbuffer.hpp:838
jau::ringbuffer::drop
bool drop(const Size_type count) noexcept
Drops.
Definition: ringbuffer.hpp:1112
jau::ringbuffer::get
Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept
Dequeues the oldest enqueued min(dest_len, getSize()>=min_count) elements by copying them into the gi...
Definition: ringbuffer.hpp:1079
jau::ringbuffer::get
bool get(Value_type &result) noexcept
Dequeues the oldest enqueued element if available, otherwise null.
Definition: ringbuffer.hpp:1021
jau::ringbuffer::const_pointer
const value_type * const_pointer
Definition: ringbuffer.hpp:124
jau::ringbuffer::recapacity
void recapacity(const Size_type newCapacity)
Resizes this ring buffer's capacity.
Definition: ringbuffer.hpp:1222
jau::ringbuffer::peekBlocking
Value_type peekBlocking(const int timeoutMS=0) noexcept
Peeks the next element at the read position w/o modifying pointer, but with blocking.
Definition: ringbuffer.hpp:972
jau::ringbuffer::ringbuffer
ringbuffer(ringbuffer &&o) noexcept=default