Libthreadar 1.6.0
ratelier_scatter.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2025 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_SCATTER_HPP
25#define LIBTHREADAR_RATELIER_SCATTER_HPP
26
29
30#include "config.h"
31
32 // C system headers
33extern "C"
34{
35}
36 // C++ standard headers
37#include <vector>
38#include <map>
39#include <deque>
40#include <memory>
41
42 // libthreadar headers
43#include "mutex.hpp"
44
45
46namespace libthreadar
47{
48
50
63
64 template <class T> class ratelier_scatter
65 {
66 public:
67 ratelier_scatter(unsigned int size, signed int flag = 0);
68 ratelier_scatter(const ratelier_scatter & ref) = delete;
69 ratelier_scatter(ratelier_scatter && ref) = default;
70 ratelier_scatter & operator = (const ratelier_scatter & ref) = delete;
71 ratelier_scatter & operator = (ratelier_scatter && ref) noexcept = default;
72 virtual ~ratelier_scatter() = default;
73
75
84 void scatter(std::unique_ptr<T> & one, signed int flag = 0);
85
87
94 std::unique_ptr<T> worker_get_one(unsigned int & slot, signed int & flag);
95
97
99 void reset();
100
101 private:
102
103 static const unsigned int cond_empty = 0;
104 static const unsigned int cond_full = 1;
105
106 struct slot
107 {
108 std::unique_ptr<T> obj;
109 bool empty;
110 unsigned int index;
111 signed int flag;
112
113 slot(signed int val) { empty = true; flag = val; };
114 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
115 };
116
117 unsigned int next_index;
118 unsigned int lowest_index;
119 std::vector<slot> table;
120 std::map<unsigned int, unsigned int> corres;
121 std::deque<unsigned int> empty_slot;
123 };
124
125 template <class T> ratelier_scatter<T>::ratelier_scatter(unsigned int size, signed int flag):
126 table(size, slot(flag)),
127 verrou(2)
128 {
129 next_index = 0;
130 lowest_index = 0;
131
132 for(unsigned int i = 0; i < size; ++i)
133 empty_slot.push_back(i);
134 }
135
136 template <class T> void ratelier_scatter<T>::scatter(std::unique_ptr<T> & one, signed int flag)
137 {
138 unsigned int tableindex;
139
140 verrou.lock();
141 try
142 {
143 while(empty_slot.empty()) // ratelier_scatter is full
144 verrou.wait(cond_full);
145
146 tableindex = empty_slot.back();
147
148 // sanity checks
149
150 if(tableindex >= table.size())
151 throw THREADAR_BUG;
152 if( ! table[tableindex].empty)
153 throw THREADAR_BUG;
154
155 // recording the change
156
157 table[tableindex].empty = false;
158 table[tableindex].obj = std::move(one);
159 table[tableindex].index = next_index;
160 table[tableindex].flag = flag;
161
162 corres[next_index] = tableindex;
163 ++next_index;
164
165 empty_slot.pop_back();
166 if(verrou.get_waiting_thread_count(cond_empty) > 0)
167 verrou.signal(cond_empty);
168 }
169 catch(...)
170 {
171 verrou.unlock();
172 verrou.broadcast(cond_empty);
173 verrou.broadcast(cond_full);
174 throw;
175 }
176 verrou.unlock();
177 }
178
179 template <class T> std::unique_ptr<T> ratelier_scatter<T>::worker_get_one(unsigned int & slot, signed int & flag)
180 {
181 std::unique_ptr<T> ret;
182
183 verrou.lock();
184 try
185 {
186 std::map<unsigned int, unsigned int>::iterator it = corres.begin();
187 // using sequential reading provides sorted scanning
188 // of the map, looking first for the lowest index available (oldest entries)
189
190 do
191 {
192 if(it != corres.end())
193 {
194 if(it->first < lowest_index) // overflooding occured
195 ++it; // ignoring this slot
196 else
197 {
198
199 // sanity checks
200
201 if(it->second >= table.size())
202 throw THREADAR_BUG;
203 if(table[it->second].empty)
204 throw THREADAR_BUG;
205 if( ! table[it->second].obj)
206 throw THREADAR_BUG;
207
208 // recording the change
209
210 ret = std::move(table[it->second].obj);
211 slot = table[it->second].index;
212 flag = table[it->second].flag;
213 table[it->second].empty = true;
214
215 if(lowest_index != slot)
216 throw THREADAR_BUG;
217 ++lowest_index;
218
219 // reusing quicker the last block used
220 // as the back() be used first
221 empty_slot.push_back(it->second);
222 corres.erase(it); // removing the correspondance
223
224 if(verrou.get_waiting_thread_count(cond_full) > 0)
225 verrou.signal(cond_full);
226 }
227 }
228 else
229 {
230 // ratelier_scatter is empty
231
232 verrou.wait(cond_empty);
233 it = corres.begin();
234 }
235 }
236 while( ! ret);
237 }
238 catch(...)
239 {
240 verrou.unlock();
241 verrou.broadcast(cond_empty);
242 verrou.broadcast(cond_full);
243 throw;
244 }
245 verrou.unlock();
246
247 return ret;
248 }
249
250 template <class T> void ratelier_scatter<T>::reset()
251 {
252 unsigned int size = table.size();
253 next_index = 0;
254 lowest_index = 0;
255 corres.clear();
256 empty_slot.clear();
257
258 for(unsigned int i = 0; i < size; ++i)
259 {
260 table[i].obj.reset();
261 table[i].empty = true;
262 empty_slot.push_back(i);
263 }
264
265 verrou.lock();
266 verrou.broadcast(cond_empty);
267 verrou.broadcast(cond_full);
268 verrou.unlock();
269 }
270
271} // end of namespace
272
273#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:50
The class ratelier_scatter's purpose it to scatter an ordered set of data to many worker threads.
std::unique_ptr< T > worker_get_one(unsigned int &slot, signed int &flag)
For a worker thread to obtain an object in the lowest slot available.
void scatter(std::unique_ptr< T > &one, signed int flag=0)
For the non-worker thread to provide data to the ratelier_scatter.
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46