0030924: Coding - incorrect header in file OSD_ThreadPool.hxx
[occt.git] / src / OSD / OSD_ThreadPool.hxx
CommitLineData
6f498847 1// Created by: Kirill Gavrilov
87018b45 2// Copyright (c) 2017-2019 OPEN CASCADE SAS
6f498847 3//
87018b45 4// This file is part of Open CASCADE Technology software library.
6f498847 5//
87018b45 6// This library is free software; you can redistribute it and/or modify it under
7// the terms of the GNU Lesser General Public License version 2.1 as published
8// by the Free Software Foundation, with special exception defined in the file
9// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
10// distribution for complete text of the license and disclaimer of any warranty.
6f498847 11//
87018b45 12// Alternatively, this file may be used under the terms of Open CASCADE
13// commercial license or contractual agreement.
6f498847 14
15#ifndef _OSD_ThreadPool_HeaderFile
16#define _OSD_ThreadPool_HeaderFile
17
18#include <NCollection_Array1.hxx>
19#include <OSD_Thread.hxx>
6f498847 20#include <Standard_Atomic.hxx>
21#include <Standard_Condition.hxx>
22#include <Standard_Mutex.hxx>
23
24//! Class defining a thread pool for executing algorithms in multi-threaded mode.
25//! Thread pool allocates requested amount of threads and keep them alive
26//! (in sleep mode when unused) during thread pool lifetime.
27//! The same pool can be used by multiple consumers,
28//! including nested multi-threading algorithms and concurrent threads:
29//! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
30//! The functor performing a job takes two parameters - Thread Index and Data Index:
31//! void operator(int theThreadIndex, int theDataIndex){}
32//! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
33//! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
34//! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
35//! but application may prefer creating a dedicated pool for better control.
36//! - Default thread pool allocates the amount of threads considering concurrency
37//! level of the system (amount of logical processors).
38//! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
39//! (the pool should not be used!).
40//! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
41//! Normally, single Launcher instance will occupy all threads available in thread pool,
42//! so that nested multi-threaded algorithms (within the same thread)
43//! and concurrent threads trying to use the same thread pool will run sequentially.
44//! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
45//! and Launcher constructor, so that single Launcher instance will occupy not all threads
46//! in the pool allowing other threads to be used concurrently.
47//! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
48//! - Each working thread catches exceptions occurred during job execution, and Launcher will
49//! throw Standard_Failure in a caller thread on completed execution.
50class OSD_ThreadPool : public Standard_Transient
51{
52 DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
53public:
54
55 //! Return (or create) a default thread pool.
56 //! Number of threads argument will be considered only when called first time.
57 Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
58
59public:
60
61 //! Main constructor.
62 //! Application may consider specifying more threads than actually
63 //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
64 //! so that concurrent threads will be able using single Thread Pool instance more efficiently.
65 //! @param theNbThreads threads number to be created by pool
66 //! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
67 Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
68
69 //! Destructor.
70 Standard_EXPORT virtual ~OSD_ThreadPool();
71
72 //! Return TRUE if at least 2 threads are available (including self-thread).
73 bool HasThreads() const { return NbThreads() >= 2; }
74
75 //! Return the lower thread index.
76 int LowerThreadIndex() const { return 0; }
77
78 //! Return the upper thread index (last index is reserved for self-thread).
79 int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
80
81 //! Return the number of threads; >= 1.
82 int NbThreads() const { return myThreads.Size() + 1; }
83
84 //! Return maximum number of threads to be locked by a single Launcher object by default;
85 //! the entire thread pool size is returned by default.
86 int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
87
88 //! Set maximum number of threads to be locked by a single Launcher object by default.
89 //! Should be set BEFORE first usage.
90 void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
91
92 //! Checks if thread pools has active consumers.
93 Standard_EXPORT bool IsInUse();
94
95 //! Reinitialize the thread pool with a different number of threads.
96 //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
97 Standard_EXPORT void Init (int theNbThreads);
98
99protected:
100
101 //! Thread function interface.
102 class JobInterface
103 {
104 public:
105 virtual void Perform (int theThreadIndex) = 0;
106 };
107
108 //! Thread with back reference to thread pool and thread index in it.
109 class EnumeratedThread : public OSD_Thread
110 {
111 friend class OSD_ThreadPool;
112 public:
ca0962a1 113 //! Main constructor.
6f498847 114 EnumeratedThread (bool theIsSelfThread = false)
115 : myPool (NULL), myJob (NULL), myWakeEvent (false),
116 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
117 myIsStarted (false), myToCatchFpe (false),
118 myIsSelfThread (theIsSelfThread) {}
119
120 //! Occupy this thread for thread pool launcher.
121 //! @return TRUE on success, or FALSE if thread has been already occupied
122 Standard_EXPORT bool Lock();
123
124 //! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
125 Standard_EXPORT void Free();
126
127 //! Wake up the thread.
128 Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
129
130 //! Wait the thread going into Idle state (finished jobs).
131 Standard_EXPORT void WaitIdle();
132
ca0962a1 133 public:
134
135 //! Copy constructor.
136 EnumeratedThread (const EnumeratedThread& theCopy)
137 : OSD_Thread(),
138 myPool (NULL), myJob (NULL), myWakeEvent (false),
139 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
140 myIsStarted (false), myToCatchFpe (false),
141 myIsSelfThread (false) { Assign (theCopy); }
142
143 //! Assignment operator.
144 EnumeratedThread& operator= (const EnumeratedThread& theCopy)
145 {
146 Assign (theCopy);
147 return *this;
148 }
149
150 //! Assignment operator.
151 void Assign (const EnumeratedThread& theCopy)
152 {
153 OSD_Thread::Assign (theCopy);
154 myPool = theCopy.myPool;
155 myJob = theCopy.myJob;
156 myThreadIndex = theCopy.myThreadIndex;
157 myToCatchFpe = theCopy.myToCatchFpe;
158 myIsSelfThread = theCopy.myIsSelfThread;
159 }
160
6f498847 161 private:
162
163 //! Method is executed in the context of thread.
164 void performThread();
165
166 //! Method is executed in the context of thread.
167 static Standard_Address runThread (Standard_Address theTask);
168
169 private:
170 OSD_ThreadPool* myPool;
171 JobInterface* myJob;
172 Handle(Standard_Failure) myFailure;
173 Standard_Condition myWakeEvent;
174 Standard_Condition myIdleEvent;
175 int myThreadIndex;
176 volatile int myUsageCounter;
177 bool myIsStarted;
178 bool myToCatchFpe;
179 bool myIsSelfThread;
180 };
181
182public:
183
184 //! Launcher object locking a subset of threads (or all threads)
185 //! in a thread pool to perform parallel execution of the job.
186 class Launcher
187 {
188 public:
189 //! Lock specified number of threads from the thread pool.
190 //! If thread pool is already locked by another user,
191 //! Launcher will lock as many threads as possible
192 //! (if none will be locked, then single threaded execution will be done).
193 //! @param thePool thread pool to lock the threads
194 //! @param theMaxThreads number of threads to lock;
195 //! -1 specifies that default number of threads
196 //! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
197 Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
198
199 //! Release threads.
200 ~Launcher() { Release(); }
201
202 //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
203 //! otherwise, the functor will be executed within the caller thread.
204 bool HasThreads() const { return myNbThreads >= 2; }
205
206 //! Return amount of locked threads; >= 1.
207 int NbThreads() const { return myNbThreads; }
208
209 //! Return the lower thread index.
210 int LowerThreadIndex() const { return 0; }
211
212 //! Return the upper thread index (last index is reserved for the self-thread).
213 int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
214
215 //! Simple primitive for parallelization of "for" loops, e.g.:
216 //! @code
217 //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
218 //! @endcode
219 //! @param theBegin the first data index (inclusive)
220 //! @param theEnd the last data index (exclusive)
221 //! @param theFunctor functor providing an interface
222 //! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
223 template<typename Functor>
224 void Perform (int theBegin, int theEnd, const Functor& theFunctor)
225 {
226 JobRange aData (theBegin, theEnd);
227 Job<Functor> aJob (theFunctor, aData);
228 perform (aJob);
229 }
230
231 //! Release threads before Launcher destruction.
232 Standard_EXPORT void Release();
233
234 protected:
235
236 //! Execute job.
237 Standard_EXPORT void perform (JobInterface& theJob);
238
239 //! Initialize job and start threads.
240 Standard_EXPORT void run (JobInterface& theJob);
241
242 //! Wait threads execution.
243 Standard_EXPORT void wait();
244
245 private:
246 Launcher (const Launcher& theCopy);
247 Launcher& operator=(const Launcher& theCopy);
248
249 private:
250 NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
251 EnumeratedThread mySelfThread;
252 int myNbThreads; //!< amount of locked threads
253 };
254
255protected:
256
257 //! Auxiliary class which ensures exclusive access to iterators of processed data pool.
258 class JobRange
259 {
260 public:
261
262 //! Constructor
263 JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
264
265 //! Returns const link on the first element.
266 const int& Begin() const { return myBegin; }
267
268 //! Returns const link on the last element.
269 const int& End() const { return myEnd; }
270
271 //! Returns first non processed element or end.
272 //! Thread-safe method.
273 int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
274
275 private:
276 JobRange (const JobRange& theCopy);
277 JobRange& operator=(const JobRange& theCopy);
278
279 private:
280 const int& myBegin; //!< First element of range
281 const int& myEnd; //!< Last element of range
282 mutable int myIt; //!< First non processed element of range
283 };
284
285 //! Auxiliary wrapper class for thread function.
286 template<typename FunctorT> class Job : public JobInterface
287 {
288 public:
289
290 //! Constructor.
291 Job (const FunctorT& thePerformer, JobRange& theRange)
292 : myPerformer (thePerformer), myRange (theRange) {}
293
294 //! Method is executed in the context of thread.
295 virtual void Perform (int theThreadIndex) Standard_OVERRIDE
296 {
297 for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
298 {
299 myPerformer (theThreadIndex, anIter);
300 }
301 }
302
303 private:
304 Job (const Job& theCopy);
305 Job& operator=(const Job& theCopy);
306
307 private: //! @name private fields
308 const FunctorT& myPerformer; //!< Link on functor
309 const JobRange& myRange; //!< Link on processed data block
310 };
311
312 //! Release threads.
313 void release();
314
315 //! Perform the job and catch exceptions.
316 static void performJob (Handle(Standard_Failure)& theFailure,
317 OSD_ThreadPool::JobInterface* theJob,
318 int theThreadIndex);
319
ca0962a1 320private:
321 //! This method should not be called (prohibited).
322 OSD_ThreadPool (const OSD_ThreadPool& theCopy);
323 //! This method should not be called (prohibited).
324 OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy);
325
6f498847 326private:
327
328 NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
329 int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
330 bool myShutDown; //!< flag to shut down (destroy) the thread pool
331
332};
333
334#endif // _OSD_ThreadPool_HeaderFile