0030618: Modeling Algorithms, BOPTools_Parallel - avoid using map for thread-local...
[occt.git] / src / OSD / OSD_ThreadPool.hxx
CommitLineData
6f498847 1// Created by: Kirill Gavrilov
2// Copyright (c) 2017 OPEN CASCADE SAS
3//
4// This file is part of commercial software by OPEN CASCADE SAS.
5//
6// This software is furnished in accordance with the terms and conditions
7// of the contract and with the inclusion of this copyright notice.
8// This software or any other copy thereof may not be provided or otherwise
9// be made available to any third party.
10// No ownership title to the software is transferred hereby.
11//
12// OPEN CASCADE SAS makes no representation or warranties with respect to the
13// performance of this software, and specifically disclaims any responsibility
14// for any damages, special or consequential, connected with its use.
15
16#ifndef _OSD_ThreadPool_HeaderFile
17#define _OSD_ThreadPool_HeaderFile
18
19#include <NCollection_Array1.hxx>
20#include <OSD_Thread.hxx>
6f498847 21#include <Standard_Atomic.hxx>
22#include <Standard_Condition.hxx>
23#include <Standard_Mutex.hxx>
24
25//! Class defining a thread pool for executing algorithms in multi-threaded mode.
26//! Thread pool allocates requested amount of threads and keep them alive
27//! (in sleep mode when unused) during thread pool lifetime.
28//! The same pool can be used by multiple consumers,
29//! including nested multi-threading algorithms and concurrent threads:
30//! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
31//! The functor performing a job takes two parameters - Thread Index and Data Index:
32//! void operator(int theThreadIndex, int theDataIndex){}
33//! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
34//! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
35//! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
36//! but application may prefer creating a dedicated pool for better control.
37//! - Default thread pool allocates the amount of threads considering concurrency
38//! level of the system (amount of logical processors).
39//! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
40//! (the pool should not be used!).
41//! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
42//! Normally, single Launcher instance will occupy all threads available in thread pool,
43//! so that nested multi-threaded algorithms (within the same thread)
44//! and concurrent threads trying to use the same thread pool will run sequentially.
45//! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
46//! and Launcher constructor, so that single Launcher instance will occupy not all threads
47//! in the pool allowing other threads to be used concurrently.
48//! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
49//! - Each working thread catches exceptions occurred during job execution, and Launcher will
50//! throw Standard_Failure in a caller thread on completed execution.
51class OSD_ThreadPool : public Standard_Transient
52{
53 DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
54public:
55
56 //! Return (or create) a default thread pool.
57 //! Number of threads argument will be considered only when called first time.
58 Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
59
60public:
61
62 //! Main constructor.
63 //! Application may consider specifying more threads than actually
64 //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
65 //! so that concurrent threads will be able using single Thread Pool instance more efficiently.
66 //! @param theNbThreads threads number to be created by pool
67 //! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
68 Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
69
70 //! Destructor.
71 Standard_EXPORT virtual ~OSD_ThreadPool();
72
73 //! Return TRUE if at least 2 threads are available (including self-thread).
74 bool HasThreads() const { return NbThreads() >= 2; }
75
76 //! Return the lower thread index.
77 int LowerThreadIndex() const { return 0; }
78
79 //! Return the upper thread index (last index is reserved for self-thread).
80 int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
81
82 //! Return the number of threads; >= 1.
83 int NbThreads() const { return myThreads.Size() + 1; }
84
85 //! Return maximum number of threads to be locked by a single Launcher object by default;
86 //! the entire thread pool size is returned by default.
87 int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
88
89 //! Set maximum number of threads to be locked by a single Launcher object by default.
90 //! Should be set BEFORE first usage.
91 void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
92
93 //! Checks if thread pools has active consumers.
94 Standard_EXPORT bool IsInUse();
95
96 //! Reinitialize the thread pool with a different number of threads.
97 //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
98 Standard_EXPORT void Init (int theNbThreads);
99
100protected:
101
102 //! Thread function interface.
103 class JobInterface
104 {
105 public:
106 virtual void Perform (int theThreadIndex) = 0;
107 };
108
109 //! Thread with back reference to thread pool and thread index in it.
110 class EnumeratedThread : public OSD_Thread
111 {
112 friend class OSD_ThreadPool;
113 public:
ca0962a1 114 //! Main constructor.
6f498847 115 EnumeratedThread (bool theIsSelfThread = false)
116 : myPool (NULL), myJob (NULL), myWakeEvent (false),
117 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
118 myIsStarted (false), myToCatchFpe (false),
119 myIsSelfThread (theIsSelfThread) {}
120
121 //! Occupy this thread for thread pool launcher.
122 //! @return TRUE on success, or FALSE if thread has been already occupied
123 Standard_EXPORT bool Lock();
124
125 //! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
126 Standard_EXPORT void Free();
127
128 //! Wake up the thread.
129 Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
130
131 //! Wait the thread going into Idle state (finished jobs).
132 Standard_EXPORT void WaitIdle();
133
ca0962a1 134 public:
135
136 //! Copy constructor.
137 EnumeratedThread (const EnumeratedThread& theCopy)
138 : OSD_Thread(),
139 myPool (NULL), myJob (NULL), myWakeEvent (false),
140 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
141 myIsStarted (false), myToCatchFpe (false),
142 myIsSelfThread (false) { Assign (theCopy); }
143
144 //! Assignment operator.
145 EnumeratedThread& operator= (const EnumeratedThread& theCopy)
146 {
147 Assign (theCopy);
148 return *this;
149 }
150
151 //! Assignment operator.
152 void Assign (const EnumeratedThread& theCopy)
153 {
154 OSD_Thread::Assign (theCopy);
155 myPool = theCopy.myPool;
156 myJob = theCopy.myJob;
157 myThreadIndex = theCopy.myThreadIndex;
158 myToCatchFpe = theCopy.myToCatchFpe;
159 myIsSelfThread = theCopy.myIsSelfThread;
160 }
161
6f498847 162 private:
163
164 //! Method is executed in the context of thread.
165 void performThread();
166
167 //! Method is executed in the context of thread.
168 static Standard_Address runThread (Standard_Address theTask);
169
170 private:
171 OSD_ThreadPool* myPool;
172 JobInterface* myJob;
173 Handle(Standard_Failure) myFailure;
174 Standard_Condition myWakeEvent;
175 Standard_Condition myIdleEvent;
176 int myThreadIndex;
177 volatile int myUsageCounter;
178 bool myIsStarted;
179 bool myToCatchFpe;
180 bool myIsSelfThread;
181 };
182
183public:
184
185 //! Launcher object locking a subset of threads (or all threads)
186 //! in a thread pool to perform parallel execution of the job.
187 class Launcher
188 {
189 public:
190 //! Lock specified number of threads from the thread pool.
191 //! If thread pool is already locked by another user,
192 //! Launcher will lock as many threads as possible
193 //! (if none will be locked, then single threaded execution will be done).
194 //! @param thePool thread pool to lock the threads
195 //! @param theMaxThreads number of threads to lock;
196 //! -1 specifies that default number of threads
197 //! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
198 Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
199
200 //! Release threads.
201 ~Launcher() { Release(); }
202
203 //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
204 //! otherwise, the functor will be executed within the caller thread.
205 bool HasThreads() const { return myNbThreads >= 2; }
206
207 //! Return amount of locked threads; >= 1.
208 int NbThreads() const { return myNbThreads; }
209
210 //! Return the lower thread index.
211 int LowerThreadIndex() const { return 0; }
212
213 //! Return the upper thread index (last index is reserved for the self-thread).
214 int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
215
216 //! Simple primitive for parallelization of "for" loops, e.g.:
217 //! @code
218 //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
219 //! @endcode
220 //! @param theBegin the first data index (inclusive)
221 //! @param theEnd the last data index (exclusive)
222 //! @param theFunctor functor providing an interface
223 //! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
224 template<typename Functor>
225 void Perform (int theBegin, int theEnd, const Functor& theFunctor)
226 {
227 JobRange aData (theBegin, theEnd);
228 Job<Functor> aJob (theFunctor, aData);
229 perform (aJob);
230 }
231
232 //! Release threads before Launcher destruction.
233 Standard_EXPORT void Release();
234
235 protected:
236
237 //! Execute job.
238 Standard_EXPORT void perform (JobInterface& theJob);
239
240 //! Initialize job and start threads.
241 Standard_EXPORT void run (JobInterface& theJob);
242
243 //! Wait threads execution.
244 Standard_EXPORT void wait();
245
246 private:
247 Launcher (const Launcher& theCopy);
248 Launcher& operator=(const Launcher& theCopy);
249
250 private:
251 NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
252 EnumeratedThread mySelfThread;
253 int myNbThreads; //!< amount of locked threads
254 };
255
256protected:
257
258 //! Auxiliary class which ensures exclusive access to iterators of processed data pool.
259 class JobRange
260 {
261 public:
262
263 //! Constructor
264 JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
265
266 //! Returns const link on the first element.
267 const int& Begin() const { return myBegin; }
268
269 //! Returns const link on the last element.
270 const int& End() const { return myEnd; }
271
272 //! Returns first non processed element or end.
273 //! Thread-safe method.
274 int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
275
276 private:
277 JobRange (const JobRange& theCopy);
278 JobRange& operator=(const JobRange& theCopy);
279
280 private:
281 const int& myBegin; //!< First element of range
282 const int& myEnd; //!< Last element of range
283 mutable int myIt; //!< First non processed element of range
284 };
285
286 //! Auxiliary wrapper class for thread function.
287 template<typename FunctorT> class Job : public JobInterface
288 {
289 public:
290
291 //! Constructor.
292 Job (const FunctorT& thePerformer, JobRange& theRange)
293 : myPerformer (thePerformer), myRange (theRange) {}
294
295 //! Method is executed in the context of thread.
296 virtual void Perform (int theThreadIndex) Standard_OVERRIDE
297 {
298 for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
299 {
300 myPerformer (theThreadIndex, anIter);
301 }
302 }
303
304 private:
305 Job (const Job& theCopy);
306 Job& operator=(const Job& theCopy);
307
308 private: //! @name private fields
309 const FunctorT& myPerformer; //!< Link on functor
310 const JobRange& myRange; //!< Link on processed data block
311 };
312
313 //! Release threads.
314 void release();
315
316 //! Perform the job and catch exceptions.
317 static void performJob (Handle(Standard_Failure)& theFailure,
318 OSD_ThreadPool::JobInterface* theJob,
319 int theThreadIndex);
320
ca0962a1 321private:
322 //! This method should not be called (prohibited).
323 OSD_ThreadPool (const OSD_ThreadPool& theCopy);
324 //! This method should not be called (prohibited).
325 OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy);
326
6f498847 327private:
328
329 NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
330 int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
331 bool myShutDown; //!< flag to shut down (destroy) the thread pool
332
333};
334
335#endif // _OSD_ThreadPool_HeaderFile