6f498847 |
1 | // Created by: Kirill Gavrilov |
2 | // Copyright (c) 2017 OPEN CASCADE SAS |
3 | // |
4 | // This file is part of commercial software by OPEN CASCADE SAS. |
5 | // |
6 | // This software is furnished in accordance with the terms and conditions |
7 | // of the contract and with the inclusion of this copyright notice. |
8 | // This software or any other copy thereof may not be provided or otherwise |
9 | // be made available to any third party. |
10 | // No ownership title to the software is transferred hereby. |
11 | // |
12 | // OPEN CASCADE SAS makes no representation or warranties with respect to the |
13 | // performance of this software, and specifically disclaims any responsibility |
14 | // for any damages, special or consequential, connected with its use. |
15 | |
16 | #ifndef _OSD_ThreadPool_HeaderFile |
17 | #define _OSD_ThreadPool_HeaderFile |
18 | |
19 | #include <NCollection_Array1.hxx> |
20 | #include <OSD_Thread.hxx> |
6f498847 |
21 | #include <Standard_Atomic.hxx> |
22 | #include <Standard_Condition.hxx> |
23 | #include <Standard_Mutex.hxx> |
24 | |
25 | //! Class defining a thread pool for executing algorithms in multi-threaded mode. |
26 | //! Thread pool allocates requested amount of threads and keep them alive |
27 | //! (in sleep mode when unused) during thread pool lifetime. |
28 | //! The same pool can be used by multiple consumers, |
29 | //! including nested multi-threading algorithms and concurrent threads: |
30 | //! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher. |
31 | //! The functor performing a job takes two parameters - Thread Index and Data Index: |
32 | //! void operator(int theThreadIndex, int theDataIndex){} |
33 | //! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form, |
34 | //! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper(). |
35 | //! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case, |
36 | //! but application may prefer creating a dedicated pool for better control. |
37 | //! - Default thread pool allocates the amount of threads considering concurrency |
38 | //! level of the system (amount of logical processors). |
39 | //! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init() |
40 | //! (the pool should not be used!). |
41 | //! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job. |
42 | //! Normally, single Launcher instance will occupy all threads available in thread pool, |
43 | //! so that nested multi-threaded algorithms (within the same thread) |
44 | //! and concurrent threads trying to use the same thread pool will run sequentially. |
45 | //! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter |
46 | //! and Launcher constructor, so that single Launcher instance will occupy not all threads |
47 | //! in the pool allowing other threads to be used concurrently. |
48 | //! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way. |
49 | //! - Each working thread catches exceptions occurred during job execution, and Launcher will |
50 | //! throw Standard_Failure in a caller thread on completed execution. |
51 | class OSD_ThreadPool : public Standard_Transient |
52 | { |
53 | DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient) |
54 | public: |
55 | |
56 | //! Return (or create) a default thread pool. |
57 | //! Number of threads argument will be considered only when called first time. |
58 | Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1); |
59 | |
60 | public: |
61 | |
62 | //! Main constructor. |
63 | //! Application may consider specifying more threads than actually |
64 | //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value |
65 | //! so that concurrent threads will be able using single Thread Pool instance more efficiently. |
66 | //! @param theNbThreads threads number to be created by pool |
67 | //! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used) |
68 | Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1); |
69 | |
70 | //! Destructor. |
71 | Standard_EXPORT virtual ~OSD_ThreadPool(); |
72 | |
73 | //! Return TRUE if at least 2 threads are available (including self-thread). |
74 | bool HasThreads() const { return NbThreads() >= 2; } |
75 | |
76 | //! Return the lower thread index. |
77 | int LowerThreadIndex() const { return 0; } |
78 | |
79 | //! Return the upper thread index (last index is reserved for self-thread). |
80 | int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); } |
81 | |
82 | //! Return the number of threads; >= 1. |
83 | int NbThreads() const { return myThreads.Size() + 1; } |
84 | |
85 | //! Return maximum number of threads to be locked by a single Launcher object by default; |
86 | //! the entire thread pool size is returned by default. |
87 | int NbDefaultThreadsToLaunch() const { return myNbDefThreads; } |
88 | |
89 | //! Set maximum number of threads to be locked by a single Launcher object by default. |
90 | //! Should be set BEFORE first usage. |
91 | void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; } |
92 | |
93 | //! Checks if thread pools has active consumers. |
94 | Standard_EXPORT bool IsInUse(); |
95 | |
96 | //! Reinitialize the thread pool with a different number of threads. |
97 | //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown! |
98 | Standard_EXPORT void Init (int theNbThreads); |
99 | |
100 | protected: |
101 | |
102 | //! Thread function interface. |
103 | class JobInterface |
104 | { |
105 | public: |
106 | virtual void Perform (int theThreadIndex) = 0; |
107 | }; |
108 | |
109 | //! Thread with back reference to thread pool and thread index in it. |
110 | class EnumeratedThread : public OSD_Thread |
111 | { |
112 | friend class OSD_ThreadPool; |
113 | public: |
ca0962a1 |
114 | //! Main constructor. |
6f498847 |
115 | EnumeratedThread (bool theIsSelfThread = false) |
116 | : myPool (NULL), myJob (NULL), myWakeEvent (false), |
117 | myIdleEvent (false), myThreadIndex (0), myUsageCounter(0), |
118 | myIsStarted (false), myToCatchFpe (false), |
119 | myIsSelfThread (theIsSelfThread) {} |
120 | |
121 | //! Occupy this thread for thread pool launcher. |
122 | //! @return TRUE on success, or FALSE if thread has been already occupied |
123 | Standard_EXPORT bool Lock(); |
124 | |
125 | //! Release this thread for thread pool launcher; should be called only after successful OccupyThread(). |
126 | Standard_EXPORT void Free(); |
127 | |
128 | //! Wake up the thread. |
129 | Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe); |
130 | |
131 | //! Wait the thread going into Idle state (finished jobs). |
132 | Standard_EXPORT void WaitIdle(); |
133 | |
ca0962a1 |
134 | public: |
135 | |
136 | //! Copy constructor. |
137 | EnumeratedThread (const EnumeratedThread& theCopy) |
138 | : OSD_Thread(), |
139 | myPool (NULL), myJob (NULL), myWakeEvent (false), |
140 | myIdleEvent (false), myThreadIndex (0), myUsageCounter(0), |
141 | myIsStarted (false), myToCatchFpe (false), |
142 | myIsSelfThread (false) { Assign (theCopy); } |
143 | |
144 | //! Assignment operator. |
145 | EnumeratedThread& operator= (const EnumeratedThread& theCopy) |
146 | { |
147 | Assign (theCopy); |
148 | return *this; |
149 | } |
150 | |
151 | //! Assignment operator. |
152 | void Assign (const EnumeratedThread& theCopy) |
153 | { |
154 | OSD_Thread::Assign (theCopy); |
155 | myPool = theCopy.myPool; |
156 | myJob = theCopy.myJob; |
157 | myThreadIndex = theCopy.myThreadIndex; |
158 | myToCatchFpe = theCopy.myToCatchFpe; |
159 | myIsSelfThread = theCopy.myIsSelfThread; |
160 | } |
161 | |
6f498847 |
162 | private: |
163 | |
164 | //! Method is executed in the context of thread. |
165 | void performThread(); |
166 | |
167 | //! Method is executed in the context of thread. |
168 | static Standard_Address runThread (Standard_Address theTask); |
169 | |
170 | private: |
171 | OSD_ThreadPool* myPool; |
172 | JobInterface* myJob; |
173 | Handle(Standard_Failure) myFailure; |
174 | Standard_Condition myWakeEvent; |
175 | Standard_Condition myIdleEvent; |
176 | int myThreadIndex; |
177 | volatile int myUsageCounter; |
178 | bool myIsStarted; |
179 | bool myToCatchFpe; |
180 | bool myIsSelfThread; |
181 | }; |
182 | |
183 | public: |
184 | |
185 | //! Launcher object locking a subset of threads (or all threads) |
186 | //! in a thread pool to perform parallel execution of the job. |
187 | class Launcher |
188 | { |
189 | public: |
190 | //! Lock specified number of threads from the thread pool. |
191 | //! If thread pool is already locked by another user, |
192 | //! Launcher will lock as many threads as possible |
193 | //! (if none will be locked, then single threaded execution will be done). |
194 | //! @param thePool thread pool to lock the threads |
195 | //! @param theMaxThreads number of threads to lock; |
196 | //! -1 specifies that default number of threads |
197 | //! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch() |
198 | Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1); |
199 | |
200 | //! Release threads. |
201 | ~Launcher() { Release(); } |
202 | |
203 | //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread); |
204 | //! otherwise, the functor will be executed within the caller thread. |
205 | bool HasThreads() const { return myNbThreads >= 2; } |
206 | |
207 | //! Return amount of locked threads; >= 1. |
208 | int NbThreads() const { return myNbThreads; } |
209 | |
210 | //! Return the lower thread index. |
211 | int LowerThreadIndex() const { return 0; } |
212 | |
213 | //! Return the upper thread index (last index is reserved for the self-thread). |
214 | int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; } |
215 | |
216 | //! Simple primitive for parallelization of "for" loops, e.g.: |
217 | //! @code |
218 | //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {} |
219 | //! @endcode |
220 | //! @param theBegin the first data index (inclusive) |
221 | //! @param theEnd the last data index (exclusive) |
222 | //! @param theFunctor functor providing an interface |
223 | //! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index |
224 | template<typename Functor> |
225 | void Perform (int theBegin, int theEnd, const Functor& theFunctor) |
226 | { |
227 | JobRange aData (theBegin, theEnd); |
228 | Job<Functor> aJob (theFunctor, aData); |
229 | perform (aJob); |
230 | } |
231 | |
232 | //! Release threads before Launcher destruction. |
233 | Standard_EXPORT void Release(); |
234 | |
235 | protected: |
236 | |
237 | //! Execute job. |
238 | Standard_EXPORT void perform (JobInterface& theJob); |
239 | |
240 | //! Initialize job and start threads. |
241 | Standard_EXPORT void run (JobInterface& theJob); |
242 | |
243 | //! Wait threads execution. |
244 | Standard_EXPORT void wait(); |
245 | |
246 | private: |
247 | Launcher (const Launcher& theCopy); |
248 | Launcher& operator=(const Launcher& theCopy); |
249 | |
250 | private: |
251 | NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread) |
252 | EnumeratedThread mySelfThread; |
253 | int myNbThreads; //!< amount of locked threads |
254 | }; |
255 | |
256 | protected: |
257 | |
258 | //! Auxiliary class which ensures exclusive access to iterators of processed data pool. |
259 | class JobRange |
260 | { |
261 | public: |
262 | |
263 | //! Constructor |
264 | JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {} |
265 | |
266 | //! Returns const link on the first element. |
267 | const int& Begin() const { return myBegin; } |
268 | |
269 | //! Returns const link on the last element. |
270 | const int& End() const { return myEnd; } |
271 | |
272 | //! Returns first non processed element or end. |
273 | //! Thread-safe method. |
274 | int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; } |
275 | |
276 | private: |
277 | JobRange (const JobRange& theCopy); |
278 | JobRange& operator=(const JobRange& theCopy); |
279 | |
280 | private: |
281 | const int& myBegin; //!< First element of range |
282 | const int& myEnd; //!< Last element of range |
283 | mutable int myIt; //!< First non processed element of range |
284 | }; |
285 | |
286 | //! Auxiliary wrapper class for thread function. |
287 | template<typename FunctorT> class Job : public JobInterface |
288 | { |
289 | public: |
290 | |
291 | //! Constructor. |
292 | Job (const FunctorT& thePerformer, JobRange& theRange) |
293 | : myPerformer (thePerformer), myRange (theRange) {} |
294 | |
295 | //! Method is executed in the context of thread. |
296 | virtual void Perform (int theThreadIndex) Standard_OVERRIDE |
297 | { |
298 | for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It()) |
299 | { |
300 | myPerformer (theThreadIndex, anIter); |
301 | } |
302 | } |
303 | |
304 | private: |
305 | Job (const Job& theCopy); |
306 | Job& operator=(const Job& theCopy); |
307 | |
308 | private: //! @name private fields |
309 | const FunctorT& myPerformer; //!< Link on functor |
310 | const JobRange& myRange; //!< Link on processed data block |
311 | }; |
312 | |
313 | //! Release threads. |
314 | void release(); |
315 | |
316 | //! Perform the job and catch exceptions. |
317 | static void performJob (Handle(Standard_Failure)& theFailure, |
318 | OSD_ThreadPool::JobInterface* theJob, |
319 | int theThreadIndex); |
320 | |
ca0962a1 |
321 | private: |
322 | //! This method should not be called (prohibited). |
323 | OSD_ThreadPool (const OSD_ThreadPool& theCopy); |
324 | //! This method should not be called (prohibited). |
325 | OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy); |
326 | |
6f498847 |
327 | private: |
328 | |
329 | NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread) |
330 | int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default |
331 | bool myShutDown; //!< flag to shut down (destroy) the thread pool |
332 | |
333 | }; |
334 | |
335 | #endif // _OSD_ThreadPool_HeaderFile |