Libthreadar 1.6.0
ratelier_gather.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2025 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25#define LIBTHREADAR_RATELIER_GATHER_HPP
26
29
30#include "config.h"
31
32 // C system headers
33extern "C"
34{
35}
36 // C++ standard headers
37#include <vector>
38#include <map>
39#include <deque>
40#include <memory>
41
42 // libthreadar headers
43#include "mutex.hpp"
44
45
46namespace libthreadar
47{
49
63
98
99 template <class T> class ratelier_gather
100 {
101 public:
102 ratelier_gather(unsigned int size, signed int flag = 0);
103 ratelier_gather(const ratelier_gather & ref) = delete;
104 ratelier_gather(ratelier_gather && ref) = default;
105 ratelier_gather & operator = (const ratelier_gather & ref) = delete;
106 ratelier_gather & operator = (ratelier_gather && ref) noexcept = default;
107 virtual ~ratelier_gather() = default;
108
110
117 void worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag = 0);
118
120
125 void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
126
128
131 void reset();
132
133 private:
134
135 static const unsigned int cond_pending_data = 0;
136 static const unsigned int cond_full = 1;
137
138 struct slot
139 {
140 std::unique_ptr<T> obj;
141 bool empty;
142 unsigned int index;
143 signed int flag;
144
145 slot(signed int val) { empty = true; flag = val; };
146 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
147 };
148
149 unsigned int next_index;
150 std::vector<slot> table;
151 std::map<unsigned int, unsigned int> corres;
152 std::deque<unsigned int> empty_slot;
154 };
155
156 template <class T> ratelier_gather<T>::ratelier_gather(unsigned int size, signed int flag):
157 table(size, slot(flag)),
158 verrou(2)
159 {
160 next_index = 0;
161
162 for(unsigned int i = 0; i < size; ++i)
163 empty_slot.push_back(i);
164 }
165
166 template <class T> void ratelier_gather<T>::worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag)
167 {
168 verrou.lock();
169
170 try
171 {
172 while(empty_slot.empty() // no free slot available
173 || ((empty_slot.size() == 1 && slot != next_index) // one slot available and we do not provide the lowest expecting slot num
174 && corres.begin() != corres.end() && (corres.begin())->first != next_index)) // and lowest slot is still not received
175 verrou.wait(cond_full);
176
177 std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
178 unsigned int index;
179
180 if(it != corres.end())
181 throw exception_range("the ratelier_gather index to fill is already used");
182
183 index = empty_slot.back();
184
185 // sanity checks
186
187 if(index >= table.size())
188 throw THREADAR_BUG;
189 if( ! table[index].empty)
190 throw THREADAR_BUG;
191
192 // recording the change
193
194 corres[slot] = index;
195 table[index].obj = std::move(one);
196 table[index].empty = false;
197 table[index].index = slot;
198 table[index].flag = flag;
199
200 empty_slot.pop_back();
201
202 if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
203 if(corres.find(next_index) != corres.end()) // some data can be gathered
204 verrou.signal(cond_pending_data); // awaking the gathering thread
205 }
206 catch(...)
207 {
208 verrou.unlock(); // unlock first, as broadcast/signal may be the cause of the exception
209 verrou.broadcast(cond_pending_data);
210 verrou.broadcast(cond_full);
211 throw;
212 }
213 verrou.unlock();
214 }
215
216 template <class T> void ratelier_gather<T>::gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag)
217 {
218 ones.clear();
219 flag.clear();
220
221 verrou.lock();
222 try
223 {
224 std::map<unsigned int, unsigned int>::iterator it;
225 std::map<unsigned int, unsigned int>::iterator tmp;
226
227 do
228 {
229 it = corres.begin();
230
231 while(it != corres.end())
232 {
233 if(it->first > next_index) // not continuous sequence
234 break; // exiting the inner while loop
235
236 if(it->first == next_index)
237 {
238
239 // sanity checks
240
241 if(it->second >= table.size())
242 throw THREADAR_BUG;
243 if(table[it->second].index != next_index)
244 throw THREADAR_BUG;
245 if(table[it->second].empty)
246 throw THREADAR_BUG;
247 if( ! table[it->second].obj)
248 throw THREADAR_BUG;
249
250 // recording the change
251
252 ones.push_back(std::move(table[it->second].obj));
253 flag.push_back(table[it->second].flag);
254
255 table[it->second].empty = true;
256 empty_slot.push_back(it->second);
257 tmp = it;
258 ++it;
259 corres.erase(tmp);
260 ++next_index;
261 }
262 else // integer overload occured for the index
263 ++it; // skipping this entry
264 }
265
266 if(ones.empty())
267 verrou.wait(cond_pending_data);
268 }
269 while(ones.empty());
270
271 if(verrou.get_waiting_thread_count(cond_full) > 0)
272 verrou.broadcast(cond_full); // awake all pending workers
273 }
274 catch(...)
275 {
276 verrou.unlock(); // unlock first, as broadcast() may be the cause of the exception
277 verrou.broadcast(cond_pending_data);
278 verrou.broadcast(cond_full);
279 throw;
280 }
281 verrou.unlock();
282
283 if(ones.size() != flag.size())
284 throw THREADAR_BUG;
285 }
286
287 template <class T> void ratelier_gather<T>::reset()
288 {
289 unsigned int size = table.size();
290 next_index = 0;
291 corres.clear();
292 empty_slot.clear();
293
294 for(unsigned int i = 0; i < size; ++i)
295 {
296 table[i].obj.reset();
297 table[i].empty = true;
298 empty_slot.push_back(i);
299 }
300
301 verrou.lock();
302 verrou.broadcast(cond_pending_data);
303 verrou.broadcast(cond_full);
304 verrou.unlock();
305 }
306
307
308} // end of namespace
309
310#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:50
Exception used to report out or range value or argument.
Definition: exceptions.hpp:207
the class ratelier_gather's purpose it gather works from several worker threads
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to the gathering thread
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots from the ratelier_gather and free them
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46