Libthreadar 1.6.0
tampon.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2025 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_TAMPON_HPP
25#define LIBTHREADAR_TAMPON_HPP
26
29
30#include "config.h"
31
32 // C system headers
33extern "C"
34{
35
36}
37 // C++ standard headers
38
39
40 // libthreadar headers
41#include "mutex.hpp"
42#include "exceptions.hpp"
43
44namespace libthreadar
45{
46
48
101 template <class T> class tampon
102 {
103 public:
105
109 tampon(unsigned int max_block, unsigned int block_size);
110
112 tampon(const tampon & ref) = delete;
113
115 tampon(tampon && ref) noexcept = default;
116
118 tampon & operator = (const tampon & ref) = delete;
119
121 tampon & operator = (tampon && ref) noexcept = default;
122
125 ~tampon();
126
128
133 void get_block_to_feed(T * & ptr, unsigned int & num);
134
136
140 void feed(T* ptr, unsigned int written);
141
143
147 void feed_cancel_get_block(T *ptr);
148
150
155 void fetch(T* & ptr, unsigned int & num);
156
158
161 void fetch_recycle(T* ptr);
162
164
173 void fetch_push_back(T *ptr, unsigned int new_num);
174
176
179 void fetch_push_back_and_skip(T *ptr, unsigned int new_num);
180
182 void fetch_skip_back();
183
185 bool has_readable_block_next() const;
186
188 bool is_empty() const;
189
191 bool is_not_empty() const { return !is_empty(); };
192
194 bool is_full() const { return full; }; // no need to acquire mutex "modif"
195
197 bool is_not_full() const { return !is_full(); };
198
200 bool is_quiet_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == fetch_head; };
201
203
205 unsigned int size() const { return table_size; };
206
208
210 unsigned int block_size() const { return alloc_size; };
211
213 unsigned int load() const { return fetch_head <= next_feed ? next_feed - fetch_head : table_size - (fetch_head - next_feed); };
214
216 void reset();
217
218 private:
219
220 struct atom
221 {
222 T* mem;
223 unsigned int data_size;
224
225 atom() { mem = nullptr; data_size = 0; };
226 };
227
228 mutex modif; //< to make critical section when non atomic action requires a status has not changed between a test and following action
229 atom *table; //< datastructure holding data in transit between two threads
230 unsigned int table_size; //< size of table, i.e. number of struct atom it holds
231 unsigned int alloc_size; //< size of allocated memory for each atom in table
232 unsigned int next_feed; //< index in table of the next atom to use for feeding table
233 unsigned int next_fetch; //< index in table of the next atom to use for fetch table
234 unsigned int fetch_head; //< the oldest object to be fetched
235 bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
236 bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
237 mutex waiting_feeder; //< feeder thread may be stuck waiting on that semaphore if table is full
238 mutex waiting_fetcher; //< fetcher thread may be stuck waiting on that semaphore if table is empty
239 bool full; //< set when tampon is full
240 bool feeder_go_lock; //< true to inform fetcher than feeder is about to or has already acquire lock on waiting_feeder
241 bool feeder_lock_track; //< only used by feeder to lock on waiting_feeder once outside of critical section
242 bool fetcher_go_lock; //< true to inform feeder than fetcher is about to or has already acquire lock on waiting_fetcher
243 bool fetcher_lock_track; //< only used by fetcher to lock on waiting_fetcher once outside of critical section
244
245 bool is_empty_no_lock() const { return next_feed == fetch_head && !full; };
246
248 bool has_readable_block_next_no_lock() const { return next_feed != next_fetch || full; }
249
251 void shift_by_one(unsigned int & x) const;
252
254 void shift_back_by_one(unsigned int & x) const;
255
260 void shift_by_one_data_in_range(unsigned int begin, unsigned int end);
261
262 };
263
264 template <class T> tampon<T>::tampon(unsigned int max_block, unsigned int block_size)
265 {
266 table_size = max_block;
267 table = new atom[table_size];
268 if(table == nullptr)
269 throw exception_memory();
270 try
271 {
272 alloc_size = block_size;
273 try
274 {
275 for(unsigned int i = 0 ; i < table_size ; ++i)
276 {
277 table[i].mem = new T[alloc_size];
278 if(table[i].mem == nullptr)
279 throw exception_memory();
280 table[i].data_size = 0;
281 }
282 reset();
283 }
284 catch(...)
285 {
286 for(unsigned int i = 0; i < table_size ; ++i)
287 {
288 if(table[i].mem != nullptr)
289 delete [] table[i].mem;
290 }
291
292 throw;
293 }
294 }
295 catch(...)
296 {
297 if(table != nullptr)
298 delete [] table;
299 throw;
300 }
301 }
302
303
304 template <class T> tampon<T>::~tampon()
305 {
306 if(table != nullptr)
307 {
308 for(unsigned int i = 0 ; i < table_size ; ++i)
309 {
310 if(table[i].mem != nullptr)
311 delete [] table[i].mem;
312 }
313 delete [] table;
314 }
315 }
316
317 template <class T> void tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
318 {
319 if(feed_outside)
320 throw exception_range("feed already out!");
321
322 modif.lock(); // --- critical section START
323 if(is_full())
324 {
325 feeder_go_lock = true; // inform fetcher that we will suspend in waiting_feeder
326 feeder_lock_track = true;// to suspend on waiting_feeder once we will be out of the critical section
327 }
328 modif.unlock(); // --- critical section END
329
330 if(feeder_lock_track)
331 {
332 feeder_lock_track = false;
333 waiting_feeder.lock(); // cannot lock inside a critical section ...
334 }
335
336 if(is_full())
337 throw THREADAR_BUG; // still full!
338
339 feed_outside = true;
340 ptr = table[next_feed].mem;
341 num = alloc_size;
342 }
343
344 template <class T> void tampon<T>::feed(T *ptr, unsigned int num)
345 {
346 if(!feed_outside)
347 throw exception_range("fetch not outside!");
348 feed_outside = false;
349
350 if(ptr != table[next_feed].mem)
351 throw exception_range("returned ptr is not the one given earlier for feeding");
352 table[next_feed].data_size = num;
353
354 modif.lock(); // --- critical section START
355 shift_by_one(next_feed);
356 if(next_feed == fetch_head)
357 full = true;
358 if(fetcher_go_lock)
359 {
360 fetcher_go_lock = false;
361 waiting_fetcher.unlock();
362 }
363 modif.unlock(); // --- critical section END
364 }
365
366 template <class T> void tampon<T>::feed_cancel_get_block(T *ptr)
367 {
368 if(!feed_outside)
369 throw exception_range("feed not outside!");
370 feed_outside = false;
371 if(ptr != table[next_feed].mem)
372 throw exception_range("returned ptr is not the one given earlier for feeding");
373 }
374
375 template <class T> void tampon<T>::fetch(T* & ptr, unsigned int & num)
376 {
377 if(fetch_outside)
378 throw exception_range("already fetched block outside");
379
380 modif.lock(); // --- critical section START
381 if(!has_readable_block_next_no_lock())
382 {
383 fetcher_go_lock = true; // to inform feeder that we will suspend on waiting_fetcher
384 fetcher_lock_track = true; // to suspend on waiting_fetcher once we will be out of the critical section
385 }
386 modif.unlock(); // --- critical section END
387
388 if(fetcher_lock_track)
389 {
390 fetcher_lock_track = false;
391 waiting_fetcher.lock(); // cannot lock inside a critical section ...
392 }
393
394 if(is_empty())
395 throw THREADAR_BUG;
396
397 fetch_outside = true;
398 ptr = table[next_fetch].mem;
399 num = table[next_fetch].data_size;
400 }
401
402 template <class T> void tampon<T>::fetch_recycle(T* ptr)
403 {
404 if(!fetch_outside)
405 throw exception_range("no block outside for fetching");
406 fetch_outside = false;
407 if(ptr != table[next_fetch].mem)
408 throw exception_range("returned ptr is no the one given earlier for fetching");
409
410 modif.lock(); // --- critical section START
411 if(next_fetch == fetch_head)
412 {
413
414 // no block were skipped
415
416 shift_by_one(fetch_head);
417 next_fetch = fetch_head;
418 full = false;
419 }
420 else
421 {
422 unsigned int tmp = next_fetch;
423 atom tmp_tom;
424
425 shift_by_one(tmp);
426 shift_by_one_data_in_range(tmp, next_feed);
427
428 // we also take into account the situation
429 // where a block has been given for feeding
430 // so the next call to feed() will match the
431 // expected address of the returned block
432 tmp = next_feed; // recording old position of next_feed
433 shift_back_by_one(next_feed);
434 // swapping contents between old next_feed position
435 // and new one:
436 tmp_tom = table[next_feed];
437 table[next_feed] = table[tmp];
438 table[tmp] = tmp_tom;
439 // done!
440
441 full = false;
442 }
443
444 if(feeder_go_lock)
445 {
446 feeder_go_lock = false;
447 waiting_feeder.unlock();
448 }
449 modif.unlock(); // --- critical section END
450 }
451
452 template <class T> void tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
453 {
454 if(!fetch_outside)
455 throw exception_range("no block outside for fetching");
456 fetch_outside = false;
457
458 if(ptr != table[next_fetch].mem)
459 throw exception_range("returned ptr is not the one given earlier for fetching");
460 table[next_fetch].data_size = new_num;
461 }
462
463 template <class T> void tampon<T>::fetch_push_back_and_skip(T *ptr,
464 unsigned int new_num)
465 {
466 fetch_push_back(ptr, new_num);
467 modif.lock(); // --- critical section START
468 if(full && next_fetch == next_feed) // reach last block feed, cannot skip it
469 throw exception_range("cannot skip the last fed block when the tampon is full");
470 shift_by_one(next_fetch);
471 modif.unlock(); // --- critical section END
472 }
473
474 template <class T> void tampon<T>::fetch_skip_back()
475 {
476 if(fetch_outside)
477 throw exception_range("cannot skip back fetching while a block is being fetched");
478 next_fetch = fetch_head;
479 }
480
481
482 template <class T> bool tampon<T>::has_readable_block_next() const
483 {
484 bool ret;
485
486 tampon<T> *me = const_cast<tampon<T> *>(this);
487 if(me == nullptr)
488 throw THREADAR_BUG;
489 me->modif.lock();
490 ret = has_readable_block_next_no_lock();
491 me->modif.unlock();
492
493 return ret;
494 }
495
496
497 template <class T> bool tampon<T>::is_empty() const
498 {
499 bool ret;
500
501 tampon<T> * me = const_cast<tampon<T> *>(this);
502 if(me == nullptr)
503 throw THREADAR_BUG;
504 me->modif.lock();
505 ret = is_empty_no_lock();
506 me->modif.unlock();
507
508 return ret;
509 }
510
511 template <class T> void tampon<T>::reset()
512 {
513 next_feed = 0;
514 next_fetch = 0;
515 fetch_head = 0;
516 fetch_outside = false;
517 feed_outside = false;
518 full = false;
519 feeder_go_lock = false;
520 feeder_lock_track = false;
521 fetcher_go_lock = false;
522 fetcher_lock_track = false;
523 (void)waiting_feeder.try_lock();
524 (void)waiting_fetcher.try_lock();
525 }
526
527 template <class T> void tampon<T>::shift_by_one(unsigned int & x) const
528 {
529 ++x;
530 if(x >= table_size)
531 x = 0;
532 }
533
534 template <class T> void tampon<T>::shift_back_by_one(unsigned int & x) const
535 {
536 if(x == 0)
537 x = table_size - 1;
538 else
539 --x;
540 }
541
542 template <class T> void tampon<T>::shift_by_one_data_in_range(unsigned int begin, unsigned int end)
543 {
544
545 if(begin != end)
546 {
547 unsigned int prev = begin;
548 shift_back_by_one(prev);
549 T* not_squeezed = table[prev].mem; // we will erase the address pointed to by mem so we keep it in memory here
550
551 while(begin != end)
552 {
553 table[prev] = table[begin]; // this copies both mem (the value of the pointer, not the pointed to) and data_size,
554 prev = begin;
555 shift_by_one(begin);
556 }
557
558 table[prev].mem = not_squeezed;
559 table[prev].data_size = 0; // by precaution
560 }
561 }
562
563
564} // end of namespace
565
566#endif
567
Exception used to report memory allocation failures.
Definition: exceptions.hpp:152
Exception used to report out or range value or argument.
Definition: exceptions.hpp:207
void unlock()
unlock the mutex
void lock()
lock the mutex
DEPRECATED see fast_tampon instead!
Definition: tampon.hpp:102
unsigned int size() const
returns the size of the tampon in maximum number of block it can contain
Definition: tampon.hpp:205
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
Definition: tampon.hpp:194
void fetch_recycle(T *ptr)
fetcher call - step 2
Definition: tampon.hpp:402
bool is_not_full() const
to know whether the tampon is not full
Definition: tampon.hpp:197
unsigned int load() const
returns the current number of blocks currently used in the tampon (fed but not fetched)
Definition: tampon.hpp:213
tampon & operator=(const tampon &ref)=delete
no assignment operator
tampon(const tampon &ref)=delete
no copy constructor
unsigned int block_size() const
returns the allocation size of each block
Definition: tampon.hpp:210
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
Definition: tampon.hpp:366
bool has_readable_block_next() const
to known whether next fetch will be blocking (not skipped blocks)
Definition: tampon.hpp:482
tampon(unsigned int max_block, unsigned int block_size)
constructor
Definition: tampon.hpp:264
bool is_quiet_full() const
returns true if only one slot is available before filling the tampon
Definition: tampon.hpp:200
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
Definition: tampon.hpp:317
bool is_not_empty() const
to know whether the tampon is not empty
Definition: tampon.hpp:191
tampon(tampon &&ref) noexcept=default
no move constructor
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
Definition: tampon.hpp:375
void feed(T *ptr, unsigned int written)
feeder call - step 2
Definition: tampon.hpp:344
void fetch_push_back_and_skip(T *ptr, unsigned int new_num)
put back the fetched block and skip to next block for the next fetch()
Definition: tampon.hpp:463
bool is_empty() const
to know whether the tampon has objects (readable or skipped)
Definition: tampon.hpp:497
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
Definition: tampon.hpp:452
void fetch_skip_back()
reactivate all skipped blocks, next fetch() will be the oldest available block
Definition: tampon.hpp:474
void reset()
reset the object fields and mutex as if the object was just created
Definition: tampon.hpp:511
defines a set of exceptions that are used by libthreadar to report error situations
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46