Libthreadar  1.4.0
fast_tampon.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // libthreadar - is a library providing several C++ classes to work with threads
3 // Copyright (C) 2014-2020 Denis Corbin
4 //
5 // This file is part of libthreadar
6 //
7 // libthreadar is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // libhtreadar is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Lesser General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19 //
20 //----
21 // to contact the author: dar.linux@free.fr
22 /*********************************************************************/
23 
24 #ifndef LIBTHREADAR_FAST_TAMPON_H
25 #define LIBTHREADAR_FAST_TAMPON_H
26 
29 
30 #include "config.h"
31 
32  // C system headers
33 extern "C"
34 {
35 
36 }
37  // C++ standard headers
38 
39 
40  // libthreadar headers
41 #include "condition.hpp"
42 #include "exceptions.hpp"
43 
44 namespace libthreadar
45 {
46 
48 
85 
86 
87  template <class T> class fast_tampon
88  {
89  public:
91 
95  fast_tampon(unsigned int max_block, unsigned int block_size);
96 
98  fast_tampon(const fast_tampon & ref) = delete;
99 
101  fast_tampon(fast_tampon && ref) = default;
102 
104  fast_tampon & operator = (const fast_tampon & ref) = delete;
105 
107  fast_tampon & operator = (fast_tampon && ref) noexcept = default;
108 
111  ~fast_tampon();
112 
114 
119  void get_block_to_feed(T * & ptr, unsigned int & num);
120 
122 
126  void feed(T* ptr, unsigned int written);
127 
129 
133  void feed_cancel_get_block(T *ptr);
134 
136 
141  void fetch(T* & ptr, unsigned int & num);
142 
144 
147  void fetch_recycle(T* ptr);
148 
150 
159  void fetch_push_back(T *ptr, unsigned int new_num);
160 
162  bool is_empty() const { return next_feed == next_fetch; };
163 
165  bool is_not_empty() const { return !is_empty(); };
166 
168  bool is_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == next_fetch; };
169 
171  bool is_not_full() const { return !is_full(); };
172 
176  unsigned int size() const { return table_size; };
177 
181  unsigned int block_size() const { return alloc_size; };
182 
184  void reset();
185 
186  private:
187 
188  struct atom
189  {
190  T* mem;
191  unsigned int data_size;
192 
193  atom() { mem = nullptr; data_size = 0; };
194  };
195 
196  static const unsigned int cond_full = 0;
197  static const unsigned int cond_empty = 0;
198 
199  condition modif;
200  atom *table; //< datastructure holding data in transit between two threads
201  unsigned int table_size; //< size of table, i.e. number of struct atom it holds
202  unsigned int alloc_size; //< size of allocated memory for each atom in table
203  unsigned int next_feed; //< index in table of the next atom to use for feeding the table
204  unsigned int next_fetch; //< index in table of the next atom to fetch from table
205  bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
206  bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
207 
209  void shift_by_one(unsigned int & x) const;
210 
211  };
212 
213  template <class T> fast_tampon<T>::fast_tampon(unsigned int max_block, unsigned int block_size): modif(2)
214  {
215  if(max_block < 2)
216  throw exception_range("max_block for fast_tampon should be strictly greater than 1");
217  table_size = max_block;
218  table = new atom[table_size];
219  if(table == nullptr)
220  throw exception_memory();
221  try
222  {
223  alloc_size = block_size;
224  try
225  {
226  for(unsigned int i = 0 ; i < table_size ; ++i)
227  {
228  table[i].mem = new T[alloc_size];
229  if(table[i].mem == nullptr)
230  throw exception_memory();
231  table[i].data_size = 0;
232  }
233  reset();
234  }
235  catch(...)
236  {
237  for(unsigned int i = 0; i < table_size ; ++i)
238  {
239  if(table[i].mem != nullptr)
240  delete [] table[i].mem;
241  }
242 
243  throw;
244  }
245  }
246  catch(...)
247  {
248  if(table != nullptr)
249  delete [] table;
250  throw;
251  }
252  }
253 
254 
255  template <class T> fast_tampon<T>::~fast_tampon()
256  {
257  if(table != nullptr)
258  {
259  for(unsigned int i = 0 ; i < table_size ; ++i)
260  {
261  if(table[i].mem != nullptr)
262  delete [] table[i].mem;
263  }
264  delete [] table;
265  }
266  }
267 
268  template <class T> void fast_tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
269  {
270  if(feed_outside)
271  throw exception_range("feed already out!");
272 
273  if(is_full())
274  {
275  modif.lock(); // --- critical section START
276  try
277  {
278  if(is_full())
279  {
280  modif.wait(cond_full);
281 
282  if(is_full())
283  throw THREADAR_BUG; // still full!
284  }
285  // *else*
286  // full condition was transitional
287  // only the feeder (this is us) can make it happen again
288  // so the full condition should not occur before we return
289  // the block we are about to fetch
290  }
291  catch(...)
292  {
293  modif.unlock();
294  throw;
295  }
296  modif.unlock(); // --- critical section END
297  }
298 
299  feed_outside = true;
300  ptr = table[next_feed].mem;
301  num = alloc_size;
302  }
303 
304  template <class T> void fast_tampon<T>::feed(T *ptr, unsigned int num)
305  {
306  if(!feed_outside)
307  throw exception_range("fetch not outside!");
308  feed_outside = false;
309 
310  if(ptr != table[next_feed].mem)
311  throw exception_range("returned ptr is not the one given earlier for feeding");
312  table[next_feed].data_size = num;
313 
314  modif.lock(); // --- critical section START
315  try
316  {
317  shift_by_one(next_feed);
318  if(modif.get_waiting_thread_count(cond_empty) > 0)
319  modif.signal(cond_empty); // releasing the fetcher when unlock() will complete
320  }
321  catch(...)
322  {
323  modif.unlock();
324  throw;
325  }
326  modif.unlock(); // --- critical section END
327  }
328 
329  template <class T> void fast_tampon<T>::feed_cancel_get_block(T *ptr)
330  {
331  if(!feed_outside)
332  throw exception_range("feed not outside!");
333  feed_outside = false;
334  if(ptr != table[next_feed].mem)
335  throw exception_range("returned ptr is not the one given earlier for feeding");
336  }
337 
338  template <class T> void fast_tampon<T>::fetch(T* & ptr, unsigned int & num)
339  {
340  if(fetch_outside)
341  throw exception_range("already fetched block outside");
342 
343  if(is_empty())
344  {
345  modif.lock(); // --- critical section START
346  try
347  {
348  if(is_empty())
349  {
350  modif.wait(cond_empty);
351 
352  if(is_empty())
353  throw THREADAR_BUG;
354  }
355  // *else*
356  // the emptiness condition was transitional
357  // only the fetcher (this is us) can make it happen again
358  // so the empty condition should not occur before we return
359  // the block we are about to fetch
360  }
361  catch(...)
362  {
363  modif.unlock();
364  throw;
365  }
366  modif.unlock(); // --- critical section END
367  }
368 
369  fetch_outside = true;
370  ptr = table[next_fetch].mem;
371  num = table[next_fetch].data_size;
372  }
373 
374  template <class T> void fast_tampon<T>::fetch_recycle(T* ptr)
375  {
376  if(!fetch_outside)
377  throw exception_range("no block outside for fetching");
378  fetch_outside = false;
379  if(ptr != table[next_fetch].mem)
380  throw exception_range("returned ptr is no the one given earlier for fetching");
381 
382  modif.lock(); // --- critical section START
383  try
384  {
385  shift_by_one(next_fetch);
386  if(modif.get_waiting_thread_count(cond_full) > 0)
387  modif.signal(cond_full); // releasing the fetcher when unlock() will complete
388  }
389  catch(...)
390  {
391  modif.unlock();
392  throw;
393  }
394  modif.unlock(); // --- critical section END
395  }
396 
397  template <class T> void fast_tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
398  {
399  if(!fetch_outside)
400  throw exception_range("no block outside for fetching");
401  fetch_outside = false;
402 
403  if(ptr != table[next_fetch].mem)
404  throw exception_range("returned ptr is not the one given earlier for fetching");
405  table[next_fetch].data_size = new_num;
406  }
407 
408 
409  template <class T> void fast_tampon<T>::reset()
410  {
411  modif.lock(); // --- critical section START
412  try
413  {
414  if(modif.get_waiting_thread_count(cond_full) > 0
415  || modif.get_waiting_thread_count(cond_empty) > 0)
416  {
417  modif.broadcast(cond_full);
418  modif.broadcast(cond_empty);
419  throw exception_range("reseting fast_tampon while some thread were waiting on it");
420  }
421 
422  next_feed = 0;
423  next_fetch = 0;
424  fetch_outside = false;
425  feed_outside = false;
426  }
427  catch(...)
428  {
429  modif.unlock();
430  throw;
431  }
432  modif.unlock(); // --- critical section END
433  }
434 
435  template <class T> void fast_tampon<T>::shift_by_one(unsigned int & x) const
436  {
437  ++x;
438  if(x >= table_size)
439  x = 0;
440  }
441 
445 
446 } // end of namespace
447 
448 #endif
449 
Exception used to report memory allocation failures.
Definition: exceptions.hpp:152
Exception used to report out or range value or argument.
Definition: exceptions.hpp:207
Class fast_tampon provides asynchronous communication between two threads.
Definition: fast_tampon.hpp:88
void fetch_recycle(T *ptr)
fetcher call - step 2
fast_tampon(unsigned int max_block, unsigned int block_size)
constructor
bool is_empty() const
to know whether the fast_tampon has objects (readable or skipped)
fast_tampon & operator=(const fast_tampon &ref)=delete
no assignment operator
bool is_not_empty() const
to know whether the fast_tampon is not empty
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
void feed(T *ptr, unsigned int written)
feeder call - step 2
unsigned int block_size() const
bool is_not_full() const
to know whether the fast_tampon is not full
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
fast_tampon(fast_tampon &&ref)=default
no move constructor
fast_tampon(const fast_tampon &ref)=delete
no copy constructor
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
void reset()
reset the object fields and mutex as if the object was just created
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
unsigned int size() const
defines the condition class
defines a set of exceptions that are used by libthreadar to report error situations
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46