Libthreadar 1.6.0
fast_tampon.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2025 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_FAST_TAMPON_H
25#define LIBTHREADAR_FAST_TAMPON_H
26
29
30#include "config.h"
31
32 // C system headers
33extern "C"
34{
35
36}
37 // C++ standard headers
38
39
40 // libthreadar headers
41#include "condition.hpp"
42#include "exceptions.hpp"
43
44namespace libthreadar
45{
46
48
85
86
87 template <class T> class fast_tampon
88 {
89 public:
91
95 fast_tampon(unsigned int max_block, unsigned int block_size);
96
98 fast_tampon(const fast_tampon & ref) = delete;
99
101 fast_tampon(fast_tampon && ref) = default;
102
104 fast_tampon & operator = (const fast_tampon & ref) = delete;
105
107 fast_tampon & operator = (fast_tampon && ref) noexcept = default;
108
111 ~fast_tampon();
112
114
119 void get_block_to_feed(T * & ptr, unsigned int & num);
120
122
126 void feed(T* ptr, unsigned int written);
127
129
133 void feed_cancel_get_block(T *ptr);
134
136
141 void fetch(T* & ptr, unsigned int & num);
142
144
147 void fetch_recycle(T* ptr);
148
150
159 void fetch_push_back(T *ptr, unsigned int new_num);
160
162 bool is_empty() const { return next_feed == next_fetch; };
163
165 bool is_not_empty() const { return !is_empty(); };
166
168 bool is_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == next_fetch; };
169
171 bool is_not_full() const { return !is_full(); };
172
176 unsigned int size() const { return table_size; };
177
181 unsigned int block_size() const { return alloc_size; };
182
184 void reset();
185
186 private:
187
188 struct atom
189 {
190 T* mem;
191 unsigned int data_size;
192
193 atom() { mem = nullptr; data_size = 0; };
194 };
195
196 static const unsigned int cond_full = 0;
197 static const unsigned int cond_empty = 0;
198
199 condition modif;
200 atom *table; //< datastructure holding data in transit between two threads
201 unsigned int table_size; //< size of table, i.e. number of struct atom it holds
202 unsigned int alloc_size; //< size of allocated memory for each atom in table
203 unsigned int next_feed; //< index in table of the next atom to use for feeding the table
204 unsigned int next_fetch; //< index in table of the next atom to fetch from table
205 bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
206 bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
207
209 void shift_by_one(unsigned int & x) const;
210
211 };
212
213 template <class T> fast_tampon<T>::fast_tampon(unsigned int max_block, unsigned int block_size): modif(2)
214 {
215 if(max_block < 2)
216 throw exception_range("max_block for fast_tampon should be strictly greater than 1");
217 table_size = max_block;
218 table = new atom[table_size];
219 if(table == nullptr)
220 throw exception_memory();
221 try
222 {
223 alloc_size = block_size;
224 try
225 {
226 for(unsigned int i = 0 ; i < table_size ; ++i)
227 {
228 table[i].mem = new T[alloc_size];
229 if(table[i].mem == nullptr)
230 throw exception_memory();
231 table[i].data_size = 0;
232 }
233 reset();
234 }
235 catch(...)
236 {
237 for(unsigned int i = 0; i < table_size ; ++i)
238 {
239 if(table[i].mem != nullptr)
240 delete [] table[i].mem;
241 }
242
243 throw;
244 }
245 }
246 catch(...)
247 {
248 if(table != nullptr)
249 delete [] table;
250 throw;
251 }
252 }
253
254
255 template <class T> fast_tampon<T>::~fast_tampon()
256 {
257 if(table != nullptr)
258 {
259 for(unsigned int i = 0 ; i < table_size ; ++i)
260 {
261 if(table[i].mem != nullptr)
262 delete [] table[i].mem;
263 }
264 delete [] table;
265 }
266 }
267
268 template <class T> void fast_tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
269 {
270 if(feed_outside)
271 throw exception_range("feed already out!");
272
273 if(is_full())
274 {
275 modif.lock(); // --- critical section START
276 try
277 {
278 if(is_full())
279 {
280 modif.wait(cond_full);
281
282 if(is_full())
283 throw THREADAR_BUG; // still full!
284 }
285 // *else*
286 // full condition was transitional
287 // only the feeder (this is us) can make it happen again
288 // so the full condition should not occur before we return
289 // the block we are about to fetch
290 }
291 catch(...)
292 {
293 modif.unlock();
294 throw;
295 }
296 modif.unlock(); // --- critical section END
297 }
298
299 feed_outside = true;
300 ptr = table[next_feed].mem;
301 num = alloc_size;
302 }
303
304 template <class T> void fast_tampon<T>::feed(T *ptr, unsigned int num)
305 {
306 if(!feed_outside)
307 throw exception_range("fetch not outside!");
308 feed_outside = false;
309
310 if(ptr != table[next_feed].mem)
311 throw exception_range("returned ptr is not the one given earlier for feeding");
312 table[next_feed].data_size = num;
313
314 modif.lock(); // --- critical section START
315 try
316 {
317 shift_by_one(next_feed);
318 if(modif.get_waiting_thread_count(cond_empty) > 0)
319 modif.signal(cond_empty); // releasing the fetcher when unlock() will complete
320 }
321 catch(...)
322 {
323 modif.unlock();
324 throw;
325 }
326 modif.unlock(); // --- critical section END
327 }
328
329 template <class T> void fast_tampon<T>::feed_cancel_get_block(T *ptr)
330 {
331 if(!feed_outside)
332 throw exception_range("feed not outside!");
333 feed_outside = false;
334 if(ptr != table[next_feed].mem)
335 throw exception_range("returned ptr is not the one given earlier for feeding");
336 }
337
338 template <class T> void fast_tampon<T>::fetch(T* & ptr, unsigned int & num)
339 {
340 if(fetch_outside)
341 throw exception_range("already fetched block outside");
342
343 if(is_empty())
344 {
345 modif.lock(); // --- critical section START
346 try
347 {
348 if(is_empty())
349 {
350 modif.wait(cond_empty);
351
352 if(is_empty())
353 throw THREADAR_BUG;
354 }
355 // *else*
356 // the emptiness condition was transitional
357 // only the fetcher (this is us) can make it happen again
358 // so the empty condition should not occur before we return
359 // the block we are about to fetch
360 }
361 catch(...)
362 {
363 modif.unlock();
364 throw;
365 }
366 modif.unlock(); // --- critical section END
367 }
368
369 fetch_outside = true;
370 ptr = table[next_fetch].mem;
371 num = table[next_fetch].data_size;
372 }
373
374 template <class T> void fast_tampon<T>::fetch_recycle(T* ptr)
375 {
376 if(!fetch_outside)
377 throw exception_range("no block outside for fetching");
378 fetch_outside = false;
379 if(ptr != table[next_fetch].mem)
380 throw exception_range("returned ptr is no the one given earlier for fetching");
381
382 modif.lock(); // --- critical section START
383 try
384 {
385 shift_by_one(next_fetch);
386 if(modif.get_waiting_thread_count(cond_full) > 0)
387 modif.signal(cond_full); // releasing the fetcher when unlock() will complete
388 }
389 catch(...)
390 {
391 modif.unlock();
392 throw;
393 }
394 modif.unlock(); // --- critical section END
395 }
396
397 template <class T> void fast_tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
398 {
399 if(!fetch_outside)
400 throw exception_range("no block outside for fetching");
401 fetch_outside = false;
402
403 if(ptr != table[next_fetch].mem)
404 throw exception_range("returned ptr is not the one given earlier for fetching");
405 table[next_fetch].data_size = new_num;
406 }
407
408
409 template <class T> void fast_tampon<T>::reset()
410 {
411 modif.lock(); // --- critical section START
412 try
413 {
414 if(modif.get_waiting_thread_count(cond_full) > 0
415 || modif.get_waiting_thread_count(cond_empty) > 0)
416 {
417 modif.broadcast(cond_full);
418 modif.broadcast(cond_empty);
419 throw exception_range("reseting fast_tampon while some thread were waiting on it");
420 }
421
422 next_feed = 0;
423 next_fetch = 0;
424 fetch_outside = false;
425 feed_outside = false;
426 }
427 catch(...)
428 {
429 modif.unlock();
430 throw;
431 }
432 modif.unlock(); // --- critical section END
433 }
434
435 template <class T> void fast_tampon<T>::shift_by_one(unsigned int & x) const
436 {
437 ++x;
438 if(x >= table_size)
439 x = 0;
440 }
441
445
446} // end of namespace
447
448#endif
449
Exception used to report memory allocation failures.
Definition: exceptions.hpp:152
Exception used to report out or range value or argument.
Definition: exceptions.hpp:207
Class fast_tampon provides asynchronous communication between two threads.
Definition: fast_tampon.hpp:88
void fetch_recycle(T *ptr)
fetcher call - step 2
fast_tampon(unsigned int max_block, unsigned int block_size)
constructor
bool is_empty() const
to know whether the fast_tampon has objects (readable or skipped)
bool is_not_empty() const
to know whether the fast_tampon is not empty
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
void feed(T *ptr, unsigned int written)
feeder call - step 2
unsigned int block_size() const
bool is_not_full() const
to know whether the fast_tampon is not full
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
fast_tampon(fast_tampon &&ref)=default
no move constructor
fast_tampon(const fast_tampon &ref)=delete
no copy constructor
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
fast_tampon & operator=(const fast_tampon &ref)=delete
no assignment operator
void reset()
reset the object fields and mutex as if the object was just created
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
unsigned int size() const
defines the condition class
defines a set of exceptions that are used by libthreadar to report error situations
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46