1 // Created by: Kirill Gavrilov
2 // Copyright (c) 2017 OPEN CASCADE SAS
4 // This file is part of commercial software by OPEN CASCADE SAS.
6 // This software is furnished in accordance with the terms and conditions
7 // of the contract and with the inclusion of this copyright notice.
8 // This software or any other copy thereof may not be provided or otherwise
9 // be made available to any third party.
10 // No ownership title to the software is transferred hereby.
12 // OPEN CASCADE SAS makes no representation or warranties with respect to the
13 // performance of this software, and specifically disclaims any responsibility
14 // for any damages, special or consequential, connected with its use.
16 #include <OSD_ThreadPool.hxx>
19 #include <OSD_Parallel.hxx>
20 #include <Standard_Atomic.hxx>
21 #include <TCollection_AsciiString.hxx>
23 IMPLEMENT_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
25 // =======================================================================
28 // =======================================================================
29 bool OSD_ThreadPool::EnumeratedThread::Lock()
31 return Standard_Atomic_CompareAndSwap (&myUsageCounter, 0, 1);
34 // =======================================================================
37 // =======================================================================
38 void OSD_ThreadPool::EnumeratedThread::Free()
40 Standard_Atomic_CompareAndSwap (&myUsageCounter, 1, 0);
43 // =======================================================================
46 // =======================================================================
47 void OSD_ThreadPool::EnumeratedThread::WakeUp (JobInterface* theJob, bool theToCatchFpe)
50 myToCatchFpe = theToCatchFpe;
55 OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
61 if (theJob != NULL && !myIsStarted)
68 // =======================================================================
69 // function : WaitIdle
71 // =======================================================================
72 void OSD_ThreadPool::EnumeratedThread::WaitIdle()
81 // =======================================================================
82 // function : DefaultPool
84 // =======================================================================
85 const Handle(OSD_ThreadPool)& OSD_ThreadPool::DefaultPool (int theNbThreads)
87 static const Handle(OSD_ThreadPool) THE_GLOBAL_POOL = new OSD_ThreadPool (theNbThreads);
88 return THE_GLOBAL_POOL;
91 // =======================================================================
92 // function : OSD_ThreadPool
94 // =======================================================================
95 OSD_ThreadPool::OSD_ThreadPool (int theNbThreads)
100 myNbDefThreads = NbThreads();
103 // =======================================================================
104 // function : IsInUse
106 // =======================================================================
107 bool OSD_ThreadPool::IsInUse()
109 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
110 aThreadIter.More(); aThreadIter.Next())
112 EnumeratedThread& aThread = aThreadIter.ChangeValue();
122 // =======================================================================
125 // =======================================================================
126 void OSD_ThreadPool::Init (int theNbThreads)
128 const int aNbThreads = Max (0, (theNbThreads > 0 ? theNbThreads : OSD_Parallel::NbLogicalProcessors()) - 1);
129 if (myThreads.Size() == aNbThreads)
134 // release old threads
135 if (!myThreads.IsEmpty())
137 NCollection_Array1<EnumeratedThread*> aLockThreads (myThreads.Lower(), myThreads.Upper());
138 aLockThreads.Init (NULL);
139 int aThreadIndex = myThreads.Lower();
140 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
141 aThreadIter.More(); aThreadIter.Next())
143 EnumeratedThread& aThread = aThreadIter.ChangeValue();
146 for (NCollection_Array1<EnumeratedThread*>::Iterator aLockThreadIter (aLockThreads);
147 aLockThreadIter.More() && aLockThreadIter.Value() != NULL; aLockThreadIter.Next())
149 aLockThreadIter.ChangeValue()->Free();
151 throw Standard_ProgramError ("Error: active ThreadPool is reinitialized");
153 aLockThreads.SetValue (aThreadIndex++, &aThread);
161 myThreads.Resize (0, aNbThreads - 1, false);
162 int aLastThreadIndex = 0;
163 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
164 aThreadIter.More(); aThreadIter.Next())
166 EnumeratedThread& aThread = aThreadIter.ChangeValue();
167 aThread.myPool = this;
168 aThread.myThreadIndex = aLastThreadIndex++;
169 aThread.SetFunction (&OSD_ThreadPool::EnumeratedThread::runThread);
174 NCollection_Array1<EnumeratedThread> anEmpty;
175 myThreads.Move (anEmpty);
179 // =======================================================================
180 // function : ~OSD_ThreadPool
182 // =======================================================================
183 OSD_ThreadPool::~OSD_ThreadPool()
188 // =======================================================================
189 // function : release
191 // =======================================================================
192 void OSD_ThreadPool::release()
194 if (myThreads.IsEmpty())
200 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
201 aThreadIter.More(); aThreadIter.Next())
203 aThreadIter.ChangeValue().WakeUp (NULL, false);
204 aThreadIter.ChangeValue().Wait();
208 // =======================================================================
209 // function : perform
211 // =======================================================================
212 void OSD_ThreadPool::Launcher::perform (JobInterface& theJob)
218 // =======================================================================
221 // =======================================================================
222 void OSD_ThreadPool::Launcher::run (JobInterface& theJob)
224 bool toCatchFpe = OSD::ToCatchFloatingSignals();
225 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
226 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
228 aThreadIter.ChangeValue()->WakeUp (&theJob, toCatchFpe);
232 // =======================================================================
235 // =======================================================================
236 void OSD_ThreadPool::Launcher::wait()
239 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
240 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
242 aThreadIter.ChangeValue()->WaitIdle();
243 if (!aThreadIter.Value()->myFailure.IsNull())
248 if (aNbFailures == 0)
253 TCollection_AsciiString aFailures;
254 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
255 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
257 if (!aThreadIter.Value()->myFailure.IsNull())
259 if (aNbFailures == 1)
261 aThreadIter.Value()->myFailure->Reraise();
264 if (!aFailures.IsEmpty())
268 aFailures += aThreadIter.Value()->myFailure->GetMessageString();
272 aFailures = TCollection_AsciiString("Multiple exceptions:\n") + aFailures;
273 throw Standard_ProgramError (aFailures.ToCString());
276 // =======================================================================
277 // function : performJob
279 // =======================================================================
280 void OSD_ThreadPool::performJob (Handle(Standard_Failure)& theFailure,
281 OSD_ThreadPool::JobInterface* theJob,
287 theJob->Perform (theThreadIndex);
289 catch (Standard_Failure const& aFailure)
291 TCollection_AsciiString aMsg = TCollection_AsciiString (aFailure.DynamicType()->Name())
292 + ": " + aFailure.GetMessageString();
293 theFailure = new Standard_ProgramError (aMsg.ToCString());
295 catch (std::exception& anStdException)
297 TCollection_AsciiString aMsg = TCollection_AsciiString (typeid(anStdException).name())
298 + ": " + anStdException.what();
299 theFailure = new Standard_ProgramError (aMsg.ToCString());
303 theFailure = new Standard_ProgramError ("Error: Unknown exception");
307 // =======================================================================
308 // function : performThread
310 // =======================================================================
311 void OSD_ThreadPool::EnumeratedThread::performThread()
313 OSD::SetSignal (false);
318 if (myPool->myShutDown)
326 OSD::SetSignal (myToCatchFpe);
327 OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
334 // =======================================================================
335 // function : runThread
337 // =======================================================================
338 Standard_Address OSD_ThreadPool::EnumeratedThread::runThread (Standard_Address theTask)
340 EnumeratedThread* aThread = static_cast<EnumeratedThread*>(theTask);
341 aThread->performThread();
345 // =======================================================================
346 // function : Launcher
348 // =======================================================================
349 OSD_ThreadPool::Launcher::Launcher (OSD_ThreadPool& thePool, Standard_Integer theMaxThreads)
350 : mySelfThread (true),
353 const int aNbThreads = theMaxThreads > 0
354 ? Min (theMaxThreads, thePool.NbThreads())
356 ? Max (thePool.NbDefaultThreadsToLaunch(), 1)
358 myThreads.Resize (0, aNbThreads - 1, false);
359 myThreads.Init (NULL);
362 for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (thePool.myThreads);
363 aThreadIter.More(); aThreadIter.Next())
365 if (aThreadIter.ChangeValue().Lock())
367 myThreads.SetValue (myNbThreads, &aThreadIter.ChangeValue());
368 // make thread index to fit into myThreads range
369 aThreadIter.ChangeValue().myThreadIndex = myNbThreads;
370 if (++myNbThreads == aNbThreads - 1)
378 // self thread should be executed last
379 myThreads.SetValue (myNbThreads, &mySelfThread);
380 mySelfThread.myThreadIndex = myNbThreads;
384 // =======================================================================
385 // function : Release
387 // =======================================================================
388 void OSD_ThreadPool::Launcher::Release()
390 for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
391 aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
393 if (aThreadIter.Value() != &mySelfThread)
395 aThreadIter.Value()->Free();
399 NCollection_Array1<EnumeratedThread*> anEmpty;
400 myThreads.Move (anEmpty);