1 // Created by: Kirill Gavrilov
2 // Copyright (c) 2017-2019 OPEN CASCADE SAS
4 // This file is part of Open CASCADE Technology software library.
6 // This library is free software; you can redistribute it and/or modify it under
7 // the terms of the GNU Lesser General Public License version 2.1 as published
8 // by the Free Software Foundation, with special exception defined in the file
9 // OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
10 // distribution for complete text of the license and disclaimer of any warranty.
12 // Alternatively, this file may be used under the terms of Open CASCADE
13 // commercial license or contractual agreement.
15 #include <OSD_ThreadPool.hxx>
18 #include <OSD_Parallel.hxx>
19 #include <Standard_Atomic.hxx>
20 #include <TCollection_AsciiString.hxx>
22 IMPLEMENT_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
24 // =======================================================================
27 // =======================================================================
28 bool OSD_ThreadPool::EnumeratedThread::Lock()
30 return Standard_Atomic_CompareAndSwap (&myUsageCounter, 0, 1);
33 // =======================================================================
36 // =======================================================================
37 void OSD_ThreadPool::EnumeratedThread::Free()
39 Standard_Atomic_CompareAndSwap (&myUsageCounter, 1, 0);
42 // =======================================================================
45 // =======================================================================
46 void OSD_ThreadPool::EnumeratedThread::WakeUp (JobInterface* theJob, bool theToCatchFpe)
49 myToCatchFpe = theToCatchFpe;
54 OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
60 if (theJob != NULL && !myIsStarted)
67 // =======================================================================
68 // function : WaitIdle
70 // =======================================================================
71 void OSD_ThreadPool::EnumeratedThread::WaitIdle()
80 // =======================================================================
81 // function : DefaultPool
83 // =======================================================================
84 const Handle(OSD_ThreadPool)& OSD_ThreadPool::DefaultPool (int theNbThreads)
86 static const Handle(OSD_ThreadPool) THE_GLOBAL_POOL = new OSD_ThreadPool (theNbThreads);
87 return THE_GLOBAL_POOL;
90 // =======================================================================
91 // function : OSD_ThreadPool
93 // =======================================================================
94 OSD_ThreadPool::OSD_ThreadPool (int theNbThreads)
99 myNbDefThreads = NbThreads();
102 // =======================================================================
103 // function : IsInUse
105 // =======================================================================
106 bool OSD_ThreadPool::IsInUse()
108 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
109 aThreadIter.More(); aThreadIter.Next())
111 EnumeratedThread& aThread = aThreadIter.ChangeValue();
121 // =======================================================================
124 // =======================================================================
125 void OSD_ThreadPool::Init (int theNbThreads)
127 const int aNbThreads = Max (0, (theNbThreads > 0 ? theNbThreads : OSD_Parallel::NbLogicalProcessors()) - 1);
128 if (myThreads.Size() == aNbThreads)
133 // release old threads
134 if (!myThreads.IsEmpty())
136 NCollection_Array1<EnumeratedThread*> aLockThreads (myThreads.Lower(), myThreads.Upper());
137 aLockThreads.Init (NULL);
138 int aThreadIndex = myThreads.Lower();
139 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
140 aThreadIter.More(); aThreadIter.Next())
142 EnumeratedThread& aThread = aThreadIter.ChangeValue();
145 for (NCollection_Array1<EnumeratedThread*>::Iterator aLockThreadIter (aLockThreads);
146 aLockThreadIter.More() && aLockThreadIter.Value() != NULL; aLockThreadIter.Next())
148 aLockThreadIter.ChangeValue()->Free();
150 throw Standard_ProgramError ("Error: active ThreadPool is reinitialized");
152 aLockThreads.SetValue (aThreadIndex++, &aThread);
160 myThreads.Resize (0, aNbThreads - 1, false);
161 int aLastThreadIndex = 0;
162 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
163 aThreadIter.More(); aThreadIter.Next())
165 EnumeratedThread& aThread = aThreadIter.ChangeValue();
166 aThread.myPool = this;
167 aThread.myThreadIndex = aLastThreadIndex++;
168 aThread.SetFunction (&OSD_ThreadPool::EnumeratedThread::runThread);
173 NCollection_Array1<EnumeratedThread> anEmpty;
174 myThreads.Move (anEmpty);
178 // =======================================================================
179 // function : ~OSD_ThreadPool
181 // =======================================================================
182 OSD_ThreadPool::~OSD_ThreadPool()
187 // =======================================================================
188 // function : release
190 // =======================================================================
191 void OSD_ThreadPool::release()
193 if (myThreads.IsEmpty())
199 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
200 aThreadIter.More(); aThreadIter.Next())
202 aThreadIter.ChangeValue().WakeUp (NULL, false);
203 aThreadIter.ChangeValue().Wait();
207 // =======================================================================
208 // function : perform
210 // =======================================================================
211 void OSD_ThreadPool::Launcher::perform (JobInterface& theJob)
217 // =======================================================================
220 // =======================================================================
221 void OSD_ThreadPool::Launcher::run (JobInterface& theJob)
223 bool toCatchFpe = OSD::ToCatchFloatingSignals();
224 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
225 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
227 aThreadIter.ChangeValue()->WakeUp (&theJob, toCatchFpe);
231 // =======================================================================
234 // =======================================================================
235 void OSD_ThreadPool::Launcher::wait()
238 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
239 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
241 aThreadIter.ChangeValue()->WaitIdle();
242 if (!aThreadIter.Value()->myFailure.IsNull())
247 if (aNbFailures == 0)
252 TCollection_AsciiString aFailures;
253 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
254 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
256 if (!aThreadIter.Value()->myFailure.IsNull())
258 if (aNbFailures == 1)
260 aThreadIter.Value()->myFailure->Reraise();
263 if (!aFailures.IsEmpty())
267 aFailures += aThreadIter.Value()->myFailure->GetMessageString();
271 aFailures = TCollection_AsciiString("Multiple exceptions:\n") + aFailures;
272 throw Standard_ProgramError (aFailures.ToCString());
275 // =======================================================================
276 // function : performJob
278 // =======================================================================
279 void OSD_ThreadPool::performJob (Handle(Standard_Failure)& theFailure,
280 OSD_ThreadPool::JobInterface* theJob,
286 theJob->Perform (theThreadIndex);
288 catch (Standard_Failure const& aFailure)
290 TCollection_AsciiString aMsg = TCollection_AsciiString (aFailure.DynamicType()->Name())
291 + ": " + aFailure.GetMessageString();
292 theFailure = new Standard_ProgramError (aMsg.ToCString());
294 catch (std::exception& anStdException)
296 TCollection_AsciiString aMsg = TCollection_AsciiString (typeid(anStdException).name())
297 + ": " + anStdException.what();
298 theFailure = new Standard_ProgramError (aMsg.ToCString());
302 theFailure = new Standard_ProgramError ("Error: Unknown exception");
306 // =======================================================================
307 // function : performThread
309 // =======================================================================
310 void OSD_ThreadPool::EnumeratedThread::performThread()
312 OSD::SetSignal (false);
317 if (myPool->myShutDown)
325 OSD::SetSignal (myToCatchFpe);
326 OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
333 // =======================================================================
334 // function : runThread
336 // =======================================================================
337 Standard_Address OSD_ThreadPool::EnumeratedThread::runThread (Standard_Address theTask)
339 EnumeratedThread* aThread = static_cast<EnumeratedThread*>(theTask);
340 aThread->performThread();
344 // =======================================================================
345 // function : Launcher
347 // =======================================================================
348 OSD_ThreadPool::Launcher::Launcher (OSD_ThreadPool& thePool, Standard_Integer theMaxThreads)
349 : mySelfThread (true),
352 const int aNbThreads = theMaxThreads > 0
353 ? Min (theMaxThreads, thePool.NbThreads())
355 ? Max (thePool.NbDefaultThreadsToLaunch(), 1)
357 myThreads.Resize (0, aNbThreads - 1, false);
358 myThreads.Init (NULL);
361 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (thePool.myThreads);
362 aThreadIter.More(); aThreadIter.Next())
364 if (aThreadIter.ChangeValue().Lock())
366 myThreads.SetValue (myNbThreads, &aThreadIter.ChangeValue());
367 // make thread index to fit into myThreads range
368 aThreadIter.ChangeValue().myThreadIndex = myNbThreads;
369 if (++myNbThreads == aNbThreads - 1)
377 // self thread should be executed last
378 myThreads.SetValue (myNbThreads, &mySelfThread);
379 mySelfThread.myThreadIndex = myNbThreads;
383 // =======================================================================
384 // function : Release
386 // =======================================================================
387 void OSD_ThreadPool::Launcher::Release()
389 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
390 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
392 if (aThreadIter.Value() != &mySelfThread)
394 aThreadIter.Value()->Free();
398 NCollection_Array1<EnumeratedThread*> anEmpty;
399 myThreads.Move (anEmpty);