1 // Copyright (c) 2013-2014 OPEN CASCADE SAS
3 // This file is part of Open CASCADE Technology software library.
5 // This library is free software; you can redistribute it and/or modify it under
6 // the terms of the GNU Lesser General Public License version 2.1 as published
7 // by the Free Software Foundation, with special exception defined in the file
8 // OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
9 // distribution for complete text of the license and disclaimer of any warranty.
11 // Alternatively, this file may be used under the terms of Open CASCADE
12 // commercial license or contractual agreement.
14 #ifndef OSD_Parallel_HeaderFile
15 #define OSD_Parallel_HeaderFile
17 #include <OSD_Thread.hxx>
18 #include <Standard_Mutex.hxx>
19 #include <Standard_NotImplemented.hxx>
20 #include <Standard_Atomic.hxx>
21 #include <NCollection_Array1.hxx>
24 #include <tbb/parallel_for.h>
25 #include <tbb/parallel_for_each.h>
26 #include <tbb/blocked_range.h>
29 //! @class OSD_Parallel
30 //! @brief Simplifies code parallelization.
32 //! The Class provides an interface of parallel processing "for" and "foreach" loops.
33 //! These primitives encapsulates complete logic for creating and managing parallel context of loops.
34 //! Moreover the primitives may be a wrapper for some primitives from 3rd-party library - TBB.
35 //! To use it is necessary to implement TBB like interface which is based on functors.
41 //! void operator() ([proccesing instance]) const
48 //! In the body of the operator () should be implemented thread-safe logic of computations that can be performed in parallel context.
49 //! If parallelized loop iterates on the collections with direct access by index (such as Vector, Array),
50 //! it is more efficient to use the primitive ParallelFor (because it has no critical section).
53 //! Auxiliary class which ensures exclusive
54 //! access to iterators of processed data pool.
55 template <typename Value>
58 public: //! @name public methods
60 typedef Value Iterator;
63 Range(const Value& theBegin, const Value& theEnd)
70 //! Returns const link on the first element.
71 inline const Value& Begin() const
76 //! Returns const link on the last element.
77 inline const Value& End() const
82 //! Returns first non processed element or end.
83 //! Thread-safe method.
84 inline Iterator It() const
86 Standard_Mutex::Sentry aMutex( myMutex );
87 return ( myIt != myEnd ) ? myIt++ : myEnd;
90 private: //! @name private methods
92 //! Empty copy constructor
93 Range(const Range& theCopy);
95 //! Empty copy operator.
96 Range& operator=(const Range& theCopy);
98 private: //! @name private fields
100 const Value& myBegin; //!< Fisrt element of range.
101 const Value& myEnd; //!< Last element of range.
102 mutable Value myIt; //!< First non processed element of range.
103 mutable Standard_Mutex myMutex; //!< Access controller for the first non processed element.
106 //! Auxiliary wrapper class for thread function.
107 template <typename Functor, typename InputIterator>
110 public: //! @name public methods
113 Task(const Functor& thePerformer, Range<InputIterator>& theRange)
114 : myPerformer(thePerformer),
119 //! Method is executed in the context of thread,
120 //! so this method defines the main calculations.
121 static Standard_Address RunWithIterator(Standard_Address theTask)
123 Task<Functor, InputIterator>& aTask =
124 *( static_cast< Task<Functor, InputIterator>* >(theTask) );
126 const Range<InputIterator>& aData( aTask.myRange );
127 typename Range<InputIterator>::Iterator i = aData.It();
129 for ( ; i != aData.End(); i = aData.It() )
131 aTask.myPerformer(*i);
137 //! Method is executed in the context of thread,
138 //! so this method defines the main calculations.
139 static Standard_Address RunWithIndex(Standard_Address theTask)
141 Task<Functor, InputIterator>& aTask =
142 *( static_cast< Task<Functor, Standard_Integer>* >(theTask) );
144 const Range<Standard_Integer>& aData( aTask.myRange );
145 Standard_Integer i = aData.It();
147 for ( ; i < aData.End(); i = aData.It())
149 aTask.myPerformer(i);
155 private: //! @name private methods
157 //! Empty copy constructor.
158 Task(const Task& theCopy);
160 //! Empty copy operator.
161 Task& operator=(const Task& theCopy);
163 private: //! @name private fields
165 const Functor& myPerformer; //!< Link on functor.
166 const Range<InputIterator>& myRange; //!< Link on processed data block.
169 public: //! @name public methods
171 //! Returns number of logical proccesrs.
172 Standard_EXPORT static Standard_Integer NbLogicalProcessors();
174 //! Simple primitive for parallelization of "foreach" loops.
175 template <typename InputIterator, typename Functor>
176 static void ForEach( InputIterator theBegin,
177 InputIterator theEnd,
178 const Functor& theFunctor,
179 const Standard_Boolean isForceSingleThreadExecution
182 //! Simple primitive for parallelization of "for" loops.
183 template <typename Functor>
184 static void For( const Standard_Integer theBegin,
185 const Standard_Integer theEnd,
186 const Functor& theFunctor,
187 const Standard_Boolean isForceSingleThreadExecution
191 //=======================================================================
192 //function : OSD_Parallel::Range::It
193 //purpose : Template concretization.
194 //=======================================================================
195 template<> inline Standard_Integer OSD_Parallel::Range<Standard_Integer>::It() const
197 return Standard_Atomic_Increment( reinterpret_cast<volatile int*>(&myIt) ) - 1;
200 //=======================================================================
201 //function : ParallelForEach
203 //=======================================================================
204 template <typename InputIterator, typename Functor>
205 void OSD_Parallel::ForEach( InputIterator theBegin,
206 InputIterator theEnd,
207 const Functor& theFunctor,
208 const Standard_Boolean isForceSingleThreadExecution )
210 if ( isForceSingleThreadExecution )
212 for ( InputIterator it(theBegin); it != theEnd; it++ )
221 tbb::parallel_for_each(theBegin, theEnd, theFunctor);
223 catch ( tbb::captured_exception& anException )
225 Standard_NotImplemented::Raise(anException.what());
230 Range<InputIterator> aData(theBegin, theEnd);
231 Task<Functor, InputIterator> aTask(theFunctor, aData);
233 const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
234 NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
236 for ( Standard_Integer i = 0; i < aNbThreads; ++i )
238 OSD_Thread& aThread = aThreads(i);
239 aThread.SetFunction(&Task<Functor, InputIterator>::RunWithIterator);
243 for ( Standard_Integer i = 0; i < aNbThreads; ++i )
249 //=======================================================================
250 //function : ParallelFor
252 //=======================================================================
253 template <typename Functor>
254 void OSD_Parallel::For( const Standard_Integer theBegin,
255 const Standard_Integer theEnd,
256 const Functor& theFunctor,
257 const Standard_Boolean isForceSingleThreadExecution )
259 if ( isForceSingleThreadExecution )
261 for ( Standard_Integer i = theBegin; i < theEnd; ++i )
270 tbb::parallel_for( theBegin, theEnd, theFunctor );
272 catch ( tbb::captured_exception& anException )
274 Standard_NotImplemented::Raise(anException.what());
279 Range<Standard_Integer> aData(theBegin, theEnd);
280 Task<Functor, Standard_Integer> aTask(theFunctor, aData);
282 const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
283 NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
285 for ( Standard_Integer i = 0; i < aNbThreads; ++i )
287 OSD_Thread& aThread = aThreads(i);
288 aThread.SetFunction(&Task<Functor, Standard_Integer>::RunWithIndex);
292 for ( Standard_Integer i = 0; i < aNbThreads; ++i )