0030924: Coding - incorrect header in file OSD_ThreadPool.hxx
[occt.git] / src / OSD / OSD_ThreadPool.hxx
1 // Created by: Kirill Gavrilov
2 // Copyright (c) 2017-2019 OPEN CASCADE SAS
3 //
4 // This file is part of Open CASCADE Technology software library.
5 //
6 // This library is free software; you can redistribute it and/or modify it under
7 // the terms of the GNU Lesser General Public License version 2.1 as published
8 // by the Free Software Foundation, with special exception defined in the file
9 // OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
10 // distribution for complete text of the license and disclaimer of any warranty.
11 //
12 // Alternatively, this file may be used under the terms of Open CASCADE
13 // commercial license or contractual agreement.
14
15 #ifndef _OSD_ThreadPool_HeaderFile
16 #define _OSD_ThreadPool_HeaderFile
17
18 #include <NCollection_Array1.hxx>
19 #include <OSD_Thread.hxx>
20 #include <Standard_Atomic.hxx>
21 #include <Standard_Condition.hxx>
22 #include <Standard_Mutex.hxx>
23
24 //! Class defining a thread pool for executing algorithms in multi-threaded mode.
25 //! Thread pool allocates requested amount of threads and keep them alive
26 //! (in sleep mode when unused) during thread pool lifetime.
27 //! The same pool can be used by multiple consumers,
28 //! including nested multi-threading algorithms and concurrent threads:
29 //! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
30 //!   The functor performing a job takes two parameters - Thread Index and Data Index:
31 //!     void operator(int theThreadIndex, int theDataIndex){}
32 //!   Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
33 //!   since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
34 //! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
35 //!   but application may prefer creating a dedicated pool for better control.
36 //! - Default thread pool allocates the amount of threads considering concurrency
37 //!   level of the system (amount of logical processors).
38 //!   This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
39 //!   (the pool should not be used!).
40 //! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
41 //!   Normally, single Launcher instance will occupy all threads available in thread pool,
42 //!   so that nested multi-threaded algorithms (within the same thread)
43 //!   and concurrent threads trying to use the same thread pool will run sequentially.
44 //!   This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
45 //!   and Launcher constructor, so that single Launcher instance will occupy not all threads
46 //!   in the pool allowing other threads to be used concurrently.
47 //! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
48 //! - Each working thread catches exceptions occurred during job execution, and Launcher will
49 //!   throw Standard_Failure in a caller thread on completed execution.
50 class OSD_ThreadPool : public Standard_Transient
51 {
52   DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
53 public:
54
55   //! Return (or create) a default thread pool.
56   //! Number of threads argument will be considered only when called first time.
57   Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
58
59 public:
60
61   //! Main constructor.
62   //! Application may consider specifying more threads than actually
63   //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
64   //! so that concurrent threads will be able using single Thread Pool instance more efficiently.
65   //! @param theNbThreads threads number to be created by pool
66   //!                     (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
67   Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
68
69   //! Destructor.
70   Standard_EXPORT virtual ~OSD_ThreadPool();
71
72   //! Return TRUE if at least 2 threads are available (including self-thread).
73   bool HasThreads() const { return NbThreads() >= 2; }
74
75   //! Return the lower thread index.
76   int LowerThreadIndex() const { return 0; }
77
78   //! Return the upper thread index (last index is reserved for self-thread).
79   int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
80
81   //! Return the number of threads; >= 1.
82   int NbThreads() const { return myThreads.Size() + 1; }
83
84   //! Return maximum number of threads to be locked by a single Launcher object by default;
85   //! the entire thread pool size is returned by default.
86   int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
87
88   //! Set maximum number of threads to be locked by a single Launcher object by default.
89   //! Should be set BEFORE first usage.
90   void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
91
92   //! Checks if thread pools has active consumers.
93   Standard_EXPORT bool IsInUse();
94
95   //! Reinitialize the thread pool with a different number of threads.
96   //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
97   Standard_EXPORT void Init (int theNbThreads);
98
99 protected:
100
101   //! Thread function interface.
102   class JobInterface
103   {
104   public:
105     virtual void Perform (int theThreadIndex) = 0;
106   };
107
108   //! Thread with back reference to thread pool and thread index in it.
109   class EnumeratedThread : public OSD_Thread
110   {
111     friend class OSD_ThreadPool;
112   public:
113     //! Main constructor.
114     EnumeratedThread (bool theIsSelfThread = false)
115     : myPool (NULL), myJob (NULL), myWakeEvent (false),
116       myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
117       myIsStarted (false), myToCatchFpe (false),
118       myIsSelfThread (theIsSelfThread) {}
119
120     //! Occupy this thread for thread pool launcher.
121     //! @return TRUE on success, or FALSE if thread has been already occupied
122     Standard_EXPORT bool Lock();
123
124     //! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
125     Standard_EXPORT void Free();
126
127     //! Wake up the thread.
128     Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
129
130     //! Wait the thread going into Idle state (finished jobs).
131     Standard_EXPORT void WaitIdle();
132
133   public:
134
135     //! Copy constructor.
136     EnumeratedThread (const EnumeratedThread& theCopy)
137     : OSD_Thread(),
138       myPool (NULL), myJob (NULL), myWakeEvent (false),
139       myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
140       myIsStarted (false), myToCatchFpe (false),
141       myIsSelfThread (false) { Assign (theCopy); }
142
143     //! Assignment operator.
144     EnumeratedThread& operator= (const EnumeratedThread& theCopy)
145     {
146       Assign (theCopy);
147       return *this;
148     }
149
150     //! Assignment operator.
151     void Assign (const EnumeratedThread& theCopy)
152     {
153       OSD_Thread::Assign (theCopy);
154       myPool         = theCopy.myPool;
155       myJob          = theCopy.myJob;
156       myThreadIndex  = theCopy.myThreadIndex;
157       myToCatchFpe   = theCopy.myToCatchFpe;
158       myIsSelfThread = theCopy.myIsSelfThread;
159     }
160
161   private:
162
163     //! Method is executed in the context of thread.
164     void performThread();
165
166     //! Method is executed in the context of thread.
167     static Standard_Address runThread (Standard_Address theTask);
168
169   private:
170     OSD_ThreadPool* myPool;
171     JobInterface* myJob;
172     Handle(Standard_Failure) myFailure;
173     Standard_Condition myWakeEvent;
174     Standard_Condition myIdleEvent;
175     int myThreadIndex;
176     volatile int myUsageCounter;
177     bool myIsStarted;
178     bool myToCatchFpe;
179     bool myIsSelfThread;
180   };
181
182 public:
183
184   //! Launcher object locking a subset of threads (or all threads)
185   //! in a thread pool to perform parallel execution of the job.
186   class Launcher
187   {
188   public:
189     //! Lock specified number of threads from the thread pool.
190     //! If thread pool is already locked by another user,
191     //! Launcher will lock as many threads as possible
192     //! (if none will be locked, then single threaded execution will be done).
193     //! @param thePool       thread pool to lock the threads
194     //! @param theMaxThreads number of threads to lock;
195     //!                      -1 specifies that default number of threads
196     //!                      to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
197     Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
198
199     //! Release threads.
200     ~Launcher() { Release(); }
201
202     //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
203     //! otherwise, the functor will be executed within the caller thread.
204     bool HasThreads() const { return myNbThreads >= 2; }
205
206     //! Return amount of locked threads; >= 1.
207     int NbThreads() const { return myNbThreads; }
208
209     //! Return the lower thread index.
210     int LowerThreadIndex() const { return 0; }
211
212     //! Return the upper thread index (last index is reserved for the self-thread).
213     int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
214
215     //! Simple primitive for parallelization of "for" loops, e.g.:
216     //! @code
217     //!   for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
218     //! @endcode
219     //! @param theBegin   the first data index (inclusive)
220     //! @param theEnd     the last  data index (exclusive)
221     //! @param theFunctor functor providing an interface
222     //!                   "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
223     template<typename Functor>
224     void Perform (int theBegin, int theEnd, const Functor& theFunctor)
225     {
226       JobRange aData (theBegin, theEnd);
227       Job<Functor> aJob (theFunctor, aData);
228       perform (aJob);
229     }
230
231     //! Release threads before Launcher destruction.
232     Standard_EXPORT void Release();
233
234   protected:
235
236     //! Execute job.
237     Standard_EXPORT void perform (JobInterface& theJob);
238
239     //! Initialize job and start threads.
240     Standard_EXPORT void run (JobInterface& theJob);
241
242     //! Wait threads execution.
243     Standard_EXPORT void wait();
244
245   private:
246     Launcher           (const Launcher& theCopy);
247     Launcher& operator=(const Launcher& theCopy);
248
249   private:
250     NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
251     EnumeratedThread mySelfThread;
252     int myNbThreads; //!< amount of locked threads
253   };
254
255 protected:
256
257   //! Auxiliary class which ensures exclusive access to iterators of processed data pool.
258   class JobRange
259   {
260   public:
261
262     //! Constructor
263     JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
264
265     //! Returns const link on the first element.
266     const int& Begin() const { return myBegin; }
267
268     //! Returns const link on the last element.
269     const int& End() const { return myEnd; }
270
271     //! Returns first non processed element or end.
272     //! Thread-safe method.
273     int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
274
275   private:
276     JobRange           (const JobRange& theCopy);
277     JobRange& operator=(const JobRange& theCopy);
278
279   private:
280     const   int& myBegin; //!< First element of range
281     const   int& myEnd;   //!< Last  element of range
282     mutable int  myIt;    //!< First non processed element of range
283   };
284
285   //! Auxiliary wrapper class for thread function.
286   template<typename FunctorT> class Job : public JobInterface
287   {
288   public:
289
290     //! Constructor.
291     Job (const FunctorT& thePerformer, JobRange& theRange)
292     : myPerformer (thePerformer), myRange (theRange) {}
293
294     //! Method is executed in the context of thread.
295     virtual void Perform (int theThreadIndex) Standard_OVERRIDE
296     {
297       for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
298       {
299         myPerformer (theThreadIndex, anIter);
300       }
301     }
302
303   private:
304     Job           (const Job& theCopy);
305     Job& operator=(const Job& theCopy);
306
307   private: //! @name private fields
308     const FunctorT& myPerformer; //!< Link on functor
309     const JobRange& myRange;     //!< Link on processed data block
310   };
311
312   //! Release threads.
313   void release();
314
315   //! Perform the job and catch exceptions.
316   static void performJob (Handle(Standard_Failure)& theFailure,
317                           OSD_ThreadPool::JobInterface* theJob,
318                           int theThreadIndex);
319
320 private:
321   //! This method should not be called (prohibited).
322   OSD_ThreadPool (const OSD_ThreadPool& theCopy);
323   //! This method should not be called (prohibited).
324   OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy);
325
326 private:
327
328   NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
329   int  myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
330   bool myShutDown;     //!< flag to shut down (destroy) the thread pool
331
332 };
333
334 #endif // _OSD_ThreadPool_HeaderFile