0030618: Modeling Algorithms, BOPTools_Parallel - avoid using map for thread-local...
[occt.git] / src / OSD / OSD_ThreadPool.cxx
1 // Created by: Kirill Gavrilov
2 // Copyright (c) 2017 OPEN CASCADE SAS
3 //
4 // This file is part of commercial software by OPEN CASCADE SAS.
5 //
6 // This software is furnished in accordance with the terms and conditions
7 // of the contract and with the inclusion of this copyright notice.
8 // This software or any other copy thereof may not be provided or otherwise
9 // be made available to any third party.
10 // No ownership title to the software is transferred hereby.
11 //
12 // OPEN CASCADE SAS makes no representation or warranties with respect to the
13 // performance of this software, and specifically disclaims any responsibility
14 // for any damages, special or consequential, connected with its use.
15
16 #include <OSD_ThreadPool.hxx>
17
18 #include <OSD.hxx>
19 #include <OSD_Parallel.hxx>
20 #include <Standard_Atomic.hxx>
21 #include <TCollection_AsciiString.hxx>
22
23 IMPLEMENT_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
24
25 // =======================================================================
26 // function : Lock
27 // purpose  :
28 // =======================================================================
29 bool OSD_ThreadPool::EnumeratedThread::Lock()
30 {
31   return Standard_Atomic_CompareAndSwap (&myUsageCounter, 0, 1);
32 }
33
34 // =======================================================================
35 // function : Free
36 // purpose  :
37 // =======================================================================
38 void OSD_ThreadPool::EnumeratedThread::Free()
39 {
40   Standard_Atomic_CompareAndSwap (&myUsageCounter, 1, 0);
41 }
42
43 // =======================================================================
44 // function : WakeUp
45 // purpose  :
46 // =======================================================================
47 void OSD_ThreadPool::EnumeratedThread::WakeUp (JobInterface* theJob, bool theToCatchFpe)
48 {
49   myJob = theJob;
50   myToCatchFpe = theToCatchFpe;
51   if (myIsSelfThread)
52   {
53     if (theJob != NULL)
54     {
55       OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
56     }
57     return;
58   }
59
60   myWakeEvent.Set();
61   if (theJob != NULL && !myIsStarted)
62   {
63     myIsStarted = true;
64     Run (this);
65   }
66 }
67
68 // =======================================================================
69 // function : WaitIdle
70 // purpose  :
71 // =======================================================================
72 void OSD_ThreadPool::EnumeratedThread::WaitIdle()
73 {
74   if (!myIsSelfThread)
75   {
76     myIdleEvent.Wait();
77     myIdleEvent.Reset();
78   }
79 }
80
81 // =======================================================================
82 // function : DefaultPool
83 // purpose  :
84 // =======================================================================
85 const Handle(OSD_ThreadPool)& OSD_ThreadPool::DefaultPool (int theNbThreads)
86 {
87   static const Handle(OSD_ThreadPool) THE_GLOBAL_POOL = new OSD_ThreadPool (theNbThreads);
88   return THE_GLOBAL_POOL;
89 }
90
91 // =======================================================================
92 // function : OSD_ThreadPool
93 // purpose  :
94 // =======================================================================
95 OSD_ThreadPool::OSD_ThreadPool (int theNbThreads)
96 : myNbDefThreads (0),
97   myShutDown (false)
98 {
99   Init (theNbThreads);
100   myNbDefThreads = NbThreads();
101 }
102
103 // =======================================================================
104 // function : IsInUse
105 // purpose  :
106 // =======================================================================
107 bool OSD_ThreadPool::IsInUse()
108 {
109   for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
110        aThreadIter.More(); aThreadIter.Next())
111   {
112     EnumeratedThread& aThread = aThreadIter.ChangeValue();
113     if (!aThread.Lock())
114     {
115       return true;
116     }
117     aThread.Free();
118   }
119   return false;
120 }
121
122 // =======================================================================
123 // function : Init
124 // purpose  :
125 // =======================================================================
126 void OSD_ThreadPool::Init (int theNbThreads)
127 {
128   const int aNbThreads = Max (0, (theNbThreads > 0 ? theNbThreads : OSD_Parallel::NbLogicalProcessors()) - 1);
129   if (myThreads.Size() == aNbThreads)
130   {
131     return;
132   }
133
134   // release old threads
135   if (!myThreads.IsEmpty())
136   {
137     NCollection_Array1<EnumeratedThread*> aLockThreads (myThreads.Lower(), myThreads.Upper());
138     aLockThreads.Init (NULL);
139     int aThreadIndex = myThreads.Lower();
140     for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
141          aThreadIter.More(); aThreadIter.Next())
142     {
143       EnumeratedThread& aThread = aThreadIter.ChangeValue();
144       if (!aThread.Lock())
145       {
146         for (NCollection_Array1<EnumeratedThread*>::Iterator aLockThreadIter (aLockThreads);
147              aLockThreadIter.More() && aLockThreadIter.Value() != NULL; aLockThreadIter.Next())
148         {
149           aLockThreadIter.ChangeValue()->Free();
150         }
151         throw Standard_ProgramError ("Error: active ThreadPool is reinitialized");
152       }
153       aLockThreads.SetValue (aThreadIndex++, &aThread);
154     }
155   }
156   release();
157
158   myShutDown = false;
159   if (aNbThreads > 0)
160   {
161     myThreads.Resize (0, aNbThreads - 1, false);
162     int aLastThreadIndex = 0;
163     for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
164          aThreadIter.More(); aThreadIter.Next())
165     {
166       EnumeratedThread& aThread = aThreadIter.ChangeValue();
167       aThread.myPool        = this;
168       aThread.myThreadIndex = aLastThreadIndex++;
169       aThread.SetFunction (&OSD_ThreadPool::EnumeratedThread::runThread);
170     }
171   }
172   else
173   {
174     NCollection_Array1<EnumeratedThread> anEmpty;
175     myThreads.Move (anEmpty);
176   }
177 }
178
179 // =======================================================================
180 // function : ~OSD_ThreadPool
181 // purpose  :
182 // =======================================================================
183 OSD_ThreadPool::~OSD_ThreadPool()
184 {
185   release();
186 }
187
188 // =======================================================================
189 // function : release
190 // purpose  :
191 // =======================================================================
192 void OSD_ThreadPool::release()
193 {
194   if (myThreads.IsEmpty())
195   {
196     return;
197   }
198
199   myShutDown = true;
200   for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
201        aThreadIter.More(); aThreadIter.Next())
202   {
203     aThreadIter.ChangeValue().WakeUp (NULL, false);
204     aThreadIter.ChangeValue().Wait();
205   }
206 }
207
208 // =======================================================================
209 // function : perform
210 // purpose  :
211 // =======================================================================
212 void OSD_ThreadPool::Launcher::perform (JobInterface& theJob)
213 {
214   run (theJob);
215   wait();
216 }
217
218 // =======================================================================
219 // function : run
220 // purpose  :
221 // =======================================================================
222 void OSD_ThreadPool::Launcher::run (JobInterface& theJob)
223 {
224   bool toCatchFpe = OSD::ToCatchFloatingSignals();
225   for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
226        aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
227   {
228     aThreadIter.ChangeValue()->WakeUp (&theJob, toCatchFpe);
229   }
230 }
231
232 // =======================================================================
233 // function : wait
234 // purpose  :
235 // =======================================================================
236 void OSD_ThreadPool::Launcher::wait()
237 {
238   int aNbFailures = 0;
239   for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
240        aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
241   {
242     aThreadIter.ChangeValue()->WaitIdle();
243     if (!aThreadIter.Value()->myFailure.IsNull())
244     {
245       ++aNbFailures;
246     }
247   }
248   if (aNbFailures == 0)
249   {
250     return;
251   }
252
253   TCollection_AsciiString aFailures;
254   for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
255        aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
256   {
257     if (!aThreadIter.Value()->myFailure.IsNull())
258     {
259       if (aNbFailures == 1)
260       {
261         aThreadIter.Value()->myFailure->Reraise();
262       }
263
264       if (!aFailures.IsEmpty())
265       {
266         aFailures += "\n";
267       }
268       aFailures += aThreadIter.Value()->myFailure->GetMessageString();
269     }
270   }
271
272   aFailures = TCollection_AsciiString("Multiple exceptions:\n") + aFailures;
273   throw Standard_ProgramError (aFailures.ToCString());
274 }
275
276 // =======================================================================
277 // function : performJob
278 // purpose  :
279 // =======================================================================
280 void OSD_ThreadPool::performJob (Handle(Standard_Failure)& theFailure,
281                                  OSD_ThreadPool::JobInterface* theJob,
282                                  int theThreadIndex)
283 {
284   try
285   {
286     OCC_CATCH_SIGNALS
287     theJob->Perform (theThreadIndex);
288   }
289   catch (Standard_Failure const& aFailure)
290   {
291     TCollection_AsciiString aMsg = TCollection_AsciiString (aFailure.DynamicType()->Name())
292                                  + ": " + aFailure.GetMessageString();
293     theFailure = new Standard_ProgramError (aMsg.ToCString());
294   }
295   catch (std::exception& anStdException)
296   {
297     TCollection_AsciiString aMsg = TCollection_AsciiString (typeid(anStdException).name())
298                                  + ": " + anStdException.what();
299     theFailure = new Standard_ProgramError (aMsg.ToCString());
300   }
301   catch (...)
302   {
303     theFailure = new Standard_ProgramError ("Error: Unknown exception");
304   }
305 }
306
307 // =======================================================================
308 // function : performThread
309 // purpose  :
310 // =======================================================================
311 void OSD_ThreadPool::EnumeratedThread::performThread()
312 {
313   OSD::SetSignal (false);
314   for (;;)
315   {
316     myWakeEvent.Wait();
317     myWakeEvent.Reset();
318     if (myPool->myShutDown)
319     {
320       return;
321     }
322
323     myFailure.Nullify();
324     if (myJob != NULL)
325     {
326       OSD::SetSignal (myToCatchFpe);
327       OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
328       myJob = NULL;
329     }
330     myIdleEvent.Set();
331   }
332 }
333
334 // =======================================================================
335 // function : runThread
336 // purpose  :
337 // =======================================================================
338 Standard_Address OSD_ThreadPool::EnumeratedThread::runThread (Standard_Address theTask)
339 {
340   EnumeratedThread* aThread = static_cast<EnumeratedThread*>(theTask);
341   aThread->performThread();
342   return NULL;
343 }
344
345 // =======================================================================
346 // function : Launcher
347 // purpose  :
348 // =======================================================================
349 OSD_ThreadPool::Launcher::Launcher (OSD_ThreadPool& thePool, Standard_Integer theMaxThreads)
350 : mySelfThread (true),
351   myNbThreads (0)
352 {
353   const int aNbThreads = theMaxThreads > 0
354                        ? Min (theMaxThreads, thePool.NbThreads())
355                        : (theMaxThreads < 0
356                         ? Max (thePool.NbDefaultThreadsToLaunch(), 1)
357                         : 1);
358   myThreads.Resize (0, aNbThreads - 1, false);
359   myThreads.Init (NULL);
360   if (aNbThreads > 1)
361   {
362     for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (thePool.myThreads);
363          aThreadIter.More(); aThreadIter.Next())
364     {
365       if (aThreadIter.ChangeValue().Lock())
366       {
367         myThreads.SetValue (myNbThreads, &aThreadIter.ChangeValue());
368         // make thread index to fit into myThreads range
369         aThreadIter.ChangeValue().myThreadIndex = myNbThreads;
370         if (++myNbThreads == aNbThreads - 1)
371         {
372           break;
373         }
374       }
375     }
376   }
377
378   // self thread should be executed last
379   myThreads.SetValue (myNbThreads, &mySelfThread);
380   mySelfThread.myThreadIndex = myNbThreads;
381   ++myNbThreads;
382 }
383
384 // =======================================================================
385 // function : Release
386 // purpose  :
387 // =======================================================================
388 void OSD_ThreadPool::Launcher::Release()
389 {
390   for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
391        aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
392   {
393     if (aThreadIter.Value() != &mySelfThread)
394     {
395       aThreadIter.Value()->Free();
396     }
397   }
398
399   NCollection_Array1<EnumeratedThread*> anEmpty;
400   myThreads.Move (anEmpty);
401   myNbThreads = 0;
402 }