0029935: Foundation Classes - introduce OSD_ThreadPool class defining a thread pool
[occt.git] / src / OSD / OSD_ThreadPool.hxx
1 // Created by: Kirill Gavrilov
2 // Copyright (c) 2017 OPEN CASCADE SAS
3 //
4 // This file is part of commercial software by OPEN CASCADE SAS.
5 //
6 // This software is furnished in accordance with the terms and conditions
7 // of the contract and with the inclusion of this copyright notice.
8 // This software or any other copy thereof may not be provided or otherwise
9 // be made available to any third party.
10 // No ownership title to the software is transferred hereby.
11 //
12 // OPEN CASCADE SAS makes no representation or warranties with respect to the
13 // performance of this software, and specifically disclaims any responsibility
14 // for any damages, special or consequential, connected with its use.
15
16 #ifndef _OSD_ThreadPool_HeaderFile
17 #define _OSD_ThreadPool_HeaderFile
18
19 #include <NCollection_Array1.hxx>
20 #include <OSD_Thread.hxx>
21 #include <OSD_Parallel.hxx>
22 #include <Standard_Atomic.hxx>
23 #include <Standard_Condition.hxx>
24 #include <Standard_Mutex.hxx>
25
26 //! Class defining a thread pool for executing algorithms in multi-threaded mode.
27 //! Thread pool allocates requested amount of threads and keep them alive
28 //! (in sleep mode when unused) during thread pool lifetime.
29 //! The same pool can be used by multiple consumers,
30 //! including nested multi-threading algorithms and concurrent threads:
31 //! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
32 //!   The functor performing a job takes two parameters - Thread Index and Data Index:
33 //!     void operator(int theThreadIndex, int theDataIndex){}
34 //!   Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
35 //!   since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
36 //! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
37 //!   but application may prefer creating a dedicated pool for better control.
38 //! - Default thread pool allocates the amount of threads considering concurrency
39 //!   level of the system (amount of logical processors).
40 //!   This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
41 //!   (the pool should not be used!).
42 //! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
43 //!   Normally, single Launcher instance will occupy all threads available in thread pool,
44 //!   so that nested multi-threaded algorithms (within the same thread)
45 //!   and concurrent threads trying to use the same thread pool will run sequentially.
46 //!   This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
47 //!   and Launcher constructor, so that single Launcher instance will occupy not all threads
48 //!   in the pool allowing other threads to be used concurrently.
49 //! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
50 //! - Each working thread catches exceptions occurred during job execution, and Launcher will
51 //!   throw Standard_Failure in a caller thread on completed execution.
52 class OSD_ThreadPool : public Standard_Transient
53 {
54   DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
55 public:
56
57   //! Return (or create) a default thread pool.
58   //! Number of threads argument will be considered only when called first time.
59   Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
60
61 public:
62
63   //! Main constructor.
64   //! Application may consider specifying more threads than actually
65   //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
66   //! so that concurrent threads will be able using single Thread Pool instance more efficiently.
67   //! @param theNbThreads threads number to be created by pool
68   //!                     (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
69   Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
70
71   //! Destructor.
72   Standard_EXPORT virtual ~OSD_ThreadPool();
73
74   //! Return TRUE if at least 2 threads are available (including self-thread).
75   bool HasThreads() const { return NbThreads() >= 2; }
76
77   //! Return the lower thread index.
78   int LowerThreadIndex() const { return 0; }
79
80   //! Return the upper thread index (last index is reserved for self-thread).
81   int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
82
83   //! Return the number of threads; >= 1.
84   int NbThreads() const { return myThreads.Size() + 1; }
85
86   //! Return maximum number of threads to be locked by a single Launcher object by default;
87   //! the entire thread pool size is returned by default.
88   int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
89
90   //! Set maximum number of threads to be locked by a single Launcher object by default.
91   //! Should be set BEFORE first usage.
92   void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
93
94   //! Checks if thread pools has active consumers.
95   Standard_EXPORT bool IsInUse();
96
97   //! Reinitialize the thread pool with a different number of threads.
98   //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
99   Standard_EXPORT void Init (int theNbThreads);
100
101 protected:
102
103   //! Thread function interface.
104   class JobInterface
105   {
106   public:
107     virtual void Perform (int theThreadIndex) = 0;
108   };
109
110   //! Thread with back reference to thread pool and thread index in it.
111   class EnumeratedThread : public OSD_Thread
112   {
113     friend class OSD_ThreadPool;
114   public:
115     EnumeratedThread (bool theIsSelfThread = false)
116     : myPool (NULL), myJob (NULL), myWakeEvent (false),
117       myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
118       myIsStarted (false), myToCatchFpe (false),
119       myIsSelfThread (theIsSelfThread) {}
120
121     //! Occupy this thread for thread pool launcher.
122     //! @return TRUE on success, or FALSE if thread has been already occupied
123     Standard_EXPORT bool Lock();
124
125     //! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
126     Standard_EXPORT void Free();
127
128     //! Wake up the thread.
129     Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
130
131     //! Wait the thread going into Idle state (finished jobs).
132     Standard_EXPORT void WaitIdle();
133
134   private:
135
136     //! Method is executed in the context of thread.
137     void performThread();
138
139     //! Method is executed in the context of thread.
140     static Standard_Address runThread (Standard_Address theTask);
141
142   private:
143     OSD_ThreadPool* myPool;
144     JobInterface* myJob;
145     Handle(Standard_Failure) myFailure;
146     Standard_Condition myWakeEvent;
147     Standard_Condition myIdleEvent;
148     int myThreadIndex;
149     volatile int myUsageCounter;
150     bool myIsStarted;
151     bool myToCatchFpe;
152     bool myIsSelfThread;
153   };
154
155 public:
156
157   //! Launcher object locking a subset of threads (or all threads)
158   //! in a thread pool to perform parallel execution of the job.
159   class Launcher
160   {
161   public:
162     //! Lock specified number of threads from the thread pool.
163     //! If thread pool is already locked by another user,
164     //! Launcher will lock as many threads as possible
165     //! (if none will be locked, then single threaded execution will be done).
166     //! @param thePool       thread pool to lock the threads
167     //! @param theMaxThreads number of threads to lock;
168     //!                      -1 specifies that default number of threads
169     //!                      to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
170     Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
171
172     //! Release threads.
173     ~Launcher() { Release(); }
174
175     //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
176     //! otherwise, the functor will be executed within the caller thread.
177     bool HasThreads() const { return myNbThreads >= 2; }
178
179     //! Return amount of locked threads; >= 1.
180     int NbThreads() const { return myNbThreads; }
181
182     //! Return the lower thread index.
183     int LowerThreadIndex() const { return 0; }
184
185     //! Return the upper thread index (last index is reserved for the self-thread).
186     int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
187
188     //! Simple primitive for parallelization of "for" loops, e.g.:
189     //! @code
190     //!   for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
191     //! @endcode
192     //! @param theBegin   the first data index (inclusive)
193     //! @param theEnd     the last  data index (exclusive)
194     //! @param theFunctor functor providing an interface
195     //!                   "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
196     template<typename Functor>
197     void Perform (int theBegin, int theEnd, const Functor& theFunctor)
198     {
199       JobRange aData (theBegin, theEnd);
200       Job<Functor> aJob (theFunctor, aData);
201       perform (aJob);
202     }
203
204     //! Release threads before Launcher destruction.
205     Standard_EXPORT void Release();
206
207   protected:
208
209     //! Execute job.
210     Standard_EXPORT void perform (JobInterface& theJob);
211
212     //! Initialize job and start threads.
213     Standard_EXPORT void run (JobInterface& theJob);
214
215     //! Wait threads execution.
216     Standard_EXPORT void wait();
217
218   private:
219     Launcher           (const Launcher& theCopy);
220     Launcher& operator=(const Launcher& theCopy);
221
222   private:
223     NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
224     EnumeratedThread mySelfThread;
225     int myNbThreads; //!< amount of locked threads
226   };
227
228 protected:
229
230   //! Auxiliary class which ensures exclusive access to iterators of processed data pool.
231   class JobRange
232   {
233   public:
234
235     //! Constructor
236     JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
237
238     //! Returns const link on the first element.
239     const int& Begin() const { return myBegin; }
240
241     //! Returns const link on the last element.
242     const int& End() const { return myEnd; }
243
244     //! Returns first non processed element or end.
245     //! Thread-safe method.
246     int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
247
248   private:
249     JobRange           (const JobRange& theCopy);
250     JobRange& operator=(const JobRange& theCopy);
251
252   private:
253     const   int& myBegin; //!< First element of range
254     const   int& myEnd;   //!< Last  element of range
255     mutable int  myIt;    //!< First non processed element of range
256   };
257
258   //! Auxiliary wrapper class for thread function.
259   template<typename FunctorT> class Job : public JobInterface
260   {
261   public:
262
263     //! Constructor.
264     Job (const FunctorT& thePerformer, JobRange& theRange)
265     : myPerformer (thePerformer), myRange (theRange) {}
266
267     //! Method is executed in the context of thread.
268     virtual void Perform (int theThreadIndex) Standard_OVERRIDE
269     {
270       for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
271       {
272         myPerformer (theThreadIndex, anIter);
273       }
274     }
275
276   private:
277     Job           (const Job& theCopy);
278     Job& operator=(const Job& theCopy);
279
280   private: //! @name private fields
281     const FunctorT& myPerformer; //!< Link on functor
282     const JobRange& myRange;     //!< Link on processed data block
283   };
284
285   //! Release threads.
286   void release();
287
288   //! Perform the job and catch exceptions.
289   static void performJob (Handle(Standard_Failure)& theFailure,
290                           OSD_ThreadPool::JobInterface* theJob,
291                           int theThreadIndex);
292
293 private:
294
295   NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
296   int  myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
297   bool myShutDown;     //!< flag to shut down (destroy) the thread pool
298
299 };
300
301 #endif // _OSD_ThreadPool_HeaderFile