#ifndef OSD_Parallel_HeaderFile
#define OSD_Parallel_HeaderFile
-#include <OSD_Thread.hxx>
-#include <Standard_Mutex.hxx>
-#include <Standard_NotImplemented.hxx>
-#include <Standard_Atomic.hxx>
-#include <NCollection_Array1.hxx>
-
-#ifdef HAVE_TBB
-#include <tbb/parallel_for.h>
-#include <tbb/parallel_for_each.h>
-#include <tbb/blocked_range.h>
-#endif
+#include <Standard_Type.hxx>
+#include <memory>
+#include <type_traits>
-//! @class OSD_Parallel
-//! @brief Simplifies code parallelization.
+//! @brief Simple tool for code parallelization.
+//!
+//! OSD_Parallel class provides simple interface for parallel processing of
+//! tasks that can be formulated in terms of "for" or "foreach" loops.
+//!
+//! To use this tool it is necessary to:
+//! - organize the data to be processed in a collection accessible by
+//! iteration (usually array or vector);
+//! - implement a functor class providing operator () accepting iterator
+//! (or index in array) that does the job;
+//! - call either For() or ForEach() providing begin and end iterators and
+//! a functor object.
//!
-//! The Class provides an interface of parallel processing "for" and "foreach" loops.
-//! These primitives encapsulates complete logic for creating and managing parallel context of loops.
-//! Moreover the primitives may be a wrapper for some primitives from 3rd-party library - TBB.
-//! To use it is necessary to implement TBB like interface which is based on functors.
+//! Iterators should satisfy requirements of STL forward iterator.
+//! Functor
//!
//! @code
//! class Functor
//! };
//! @endcode
//!
-//! In the body of the operator () should be implemented thread-safe logic of computations that can be performed in parallel context.
-//! If parallelized loop iterates on the collections with direct access by index (such as Vector, Array),
-//! it is more efficient to use the primitive ParallelFor (because it has no critical section).
+//! The operator () should be implemented in a thread-safe way so that
+//! the same functor object can process different data items in parallel threads.
+//!
+//! Iteration by index (For) is expected to be more efficient than using iterators
+//! (ForEach).
+//!
+//! Implementation uses TBB if OCCT is built with support of TBB; otherwise it
+//! uses ad-hoc parallelization tool. In general, if TBB is available, it is
+//! more efficient to use it directly instead of using OSD_Parallel.
+
class OSD_Parallel
{
- //! Auxiliary class which ensures exclusive
- //! access to iterators of processed data pool.
- template <typename Value>
- class Range
+private:
+
+ //! Interface class defining API for polymorphic wrappers over iterators.
+ //! Intended to add polymorphic behaviour to For and ForEach functionality
+ //! for arbitrary objects and eliminate dependency on template parameters.
+ class IteratorInterface
{
- public: //! @name public methods
+ public:
+ virtual ~IteratorInterface() {}
- typedef Value Iterator;
+ //! Returns true if iterators wrapped by this and theOther are equal
+ virtual bool IsEqual (const IteratorInterface& theOther) const = 0;
- //! Constructor
- Range(const Value& theBegin, const Value& theEnd)
- : myBegin(theBegin),
- myEnd (theEnd),
- myIt (theBegin)
- {
- }
+ //! Increments wrapped iterator
+ virtual void Increment () = 0;
+
+ //! Returns new instance of the wrapper containing copy
+ //! of the wrapped iterator.
+ virtual IteratorInterface* Clone() const = 0;
+ };
- //! Returns const link on the first element.
- inline const Value& Begin() const
+ //! Implementation of polymorphic iterator wrapper suitable for basic
+ //! types as well as for std iterators.
+ //! Wraps instance of actual iterator type Type.
+ template<class Type>
+ class IteratorWrapper : public IteratorInterface
+ {
+ public:
+ IteratorWrapper() {}
+ IteratorWrapper(const Type& theValue) : myValue(theValue) {}
+
+ virtual bool IsEqual (const IteratorInterface& theOther) const Standard_OVERRIDE
{
- return myBegin;
+ return myValue == dynamic_cast<const IteratorWrapper<Type>&>(theOther).myValue;
}
- //! Returns const link on the last element.
- inline const Value& End() const
+ virtual void Increment () Standard_OVERRIDE
{
- return myEnd;
+ ++myValue;
}
- //! Returns first non processed element or end.
- //! Thread-safe method.
- inline Iterator It() const
+ virtual IteratorInterface* Clone() const Standard_OVERRIDE
{
- Standard_Mutex::Sentry aMutex( myMutex );
- return ( myIt != myEnd ) ? myIt++ : myEnd;
+ return new IteratorWrapper<Type>(myValue);
}
- private: //! @name private methods
-
- //! Empty copy constructor
- Range(const Range& theCopy);
+ const Type& Value() const { return myValue; }
- //! Empty copy operator.
- Range& operator=(const Range& theCopy);
-
- private: //! @name private fields
-
- const Value& myBegin; //!< Fisrt element of range.
- const Value& myEnd; //!< Last element of range.
- mutable Value myIt; //!< First non processed element of range.
- mutable Standard_Mutex myMutex; //!< Access controller for the first non processed element.
+ private:
+ Type myValue;
};
- //! Auxiliary wrapper class for thread function.
- template <typename Functor, typename InputIterator>
- class Task
+protected:
+ // Note: UniversalIterator and FunctorInterface are made protected to be
+ // accessible from specialization using threads (non-TBB).
+
+ //! Fixed-type iterator, implementing STL forward iterator interface, used for
+ //! iteration over objects subject to parallel processing.
+ //! It stores pointer to instance of polymorphic iterator inheriting from
+ //! IteratorInterface, which contains actual type-specific iterator.
+ class UniversalIterator :
+ // Note that TBB requires that value_type of iterator be copyable,
+ // thus we use its own type for that
+ public std::iterator<std::forward_iterator_tag, UniversalIterator, ptrdiff_t, UniversalIterator*, UniversalIterator&>
{
- public: //! @name public methods
+ public:
+ UniversalIterator() {}
- //! Constructor.
- Task(const Functor& thePerformer, Range<InputIterator>& theRange)
- : myPerformer(thePerformer),
- myRange (theRange)
+ UniversalIterator(IteratorInterface* theOther)
+ : myPtr(theOther)
{
}
- //! Method is executed in the context of thread,
- //! so this method defines the main calculations.
- static Standard_Address RunWithIterator(Standard_Address theTask)
+ UniversalIterator(const UniversalIterator& theOther)
+ : myPtr (theOther.myPtr->Clone())
{
- Task<Functor, InputIterator>& aTask =
- *( static_cast< Task<Functor, InputIterator>* >(theTask) );
+ }
- const Range<InputIterator>& aData( aTask.myRange );
- typename Range<InputIterator>::Iterator i = aData.It();
+ UniversalIterator& operator= (const UniversalIterator& theOther)
+ {
+ myPtr.reset (theOther.myPtr->Clone());
+ return *this;
+ }
- for ( ; i != aData.End(); i = aData.It() )
- {
- aTask.myPerformer(*i);
- }
+ bool operator!= (const UniversalIterator& theOther) const
+ {
+ return ! myPtr->IsEqual (*theOther.myPtr);
+ }
- return NULL;
+ bool operator== (const UniversalIterator& theOther) const
+ {
+ return myPtr->IsEqual (*theOther.myPtr);
}
- //! Method is executed in the context of thread,
- //! so this method defines the main calculations.
- static Standard_Address RunWithIndex(Standard_Address theTask)
+ UniversalIterator& operator++()
{
- Task<Functor, InputIterator>& aTask =
- *( static_cast< Task<Functor, Standard_Integer>* >(theTask) );
+ myPtr->Increment();
+ return *this;
+ }
+
+ UniversalIterator operator++(int)
+ {
+ UniversalIterator aValue(*this);
+ myPtr->Increment();
+ return aValue;
+ }
- const Range<Standard_Integer>& aData( aTask.myRange );
- Standard_Integer i = aData.It();
+ const UniversalIterator& operator* () const { return *this; }
+ UniversalIterator& operator* () { return *this; }
- for ( ; i < aData.End(); i = aData.It())
- {
- aTask.myPerformer(i);
- }
+ const UniversalIterator* operator->() const { return this; }
+ UniversalIterator* operator->() { return this; }
- return NULL;
+ // type cast to actual iterator
+ template <typename Iterator>
+ const Iterator& DownCast () const
+ {
+ return dynamic_cast<OSD_Parallel::IteratorWrapper<Iterator>*>(myPtr.get())->Value();
}
- private: //! @name private methods
+ private:
+#if (defined(_MSC_VER) && (_MSC_VER < 1600))
+ std::auto_ptr<IteratorInterface> myPtr;
+#else
+ std::unique_ptr<IteratorInterface> myPtr;
+#endif
+ };
- //! Empty copy constructor.
- Task(const Task& theCopy);
+ //! Interface class representing functor object.
+ //! Intended to add polymorphic behavour to For and ForEach functionality
+ //! enabling execution of arbitrary function in parallel mode.
+ class FunctorInterface
+ {
+ public:
+ virtual ~FunctorInterface() {}
- //! Empty copy operator.
- Task& operator=(const Task& theCopy);
+ virtual void operator () (UniversalIterator& theIterator) const = 0;
+ };
- private: //! @name private fields
+private:
- const Functor& myPerformer; //!< Link on functor.
- const Range<InputIterator>& myRange; //!< Link on processed data block.
+ //! Wrapper for functors manipulating on std iterators.
+ template<class Iterator, class Functor>
+ class FunctorWrapperIter : public FunctorInterface
+ {
+ public:
+ FunctorWrapperIter (const Functor& theFunctor)
+ : myFunctor(theFunctor)
+ {
+ }
+
+ virtual void operator() (UniversalIterator& theIterator) const Standard_OVERRIDE
+ {
+ const Iterator& anIt = theIterator.DownCast<Iterator>();
+ myFunctor(*anIt);
+ }
+
+ private:
+ FunctorWrapperIter (const FunctorWrapperIter&);
+ void operator = (const FunctorWrapperIter&);
+ const Functor& myFunctor;
};
-public: //! @name public methods
+ //! Wrapper for functors manipulating on integer index.
+ template<class Functor>
+ class FunctorWrapperInt : public FunctorInterface
+ {
+ public:
+ FunctorWrapperInt (const Functor& theFunctor)
+ : myFunctor(theFunctor)
+ {
+ }
- //! Returns number of logical proccesrs.
- Standard_EXPORT static Standard_Integer NbLogicalProcessors();
+ virtual void operator() (UniversalIterator& theIterator) const Standard_OVERRIDE
+ {
+ Standard_Integer anIndex = theIterator.DownCast<Standard_Integer>();
+ myFunctor(anIndex);
+ }
+
+ private:
+ FunctorWrapperInt (const FunctorWrapperInt&);
+ void operator = (const FunctorWrapperInt&);
+ const Functor& myFunctor;
+ };
+
+private:
//! Simple primitive for parallelization of "foreach" loops, e.g.:
//! @code
//! for (std::iterator anIter = theBegin; anIter != theEnd; ++anIter) {}
//! @endcode
+ //! Implementation of framework-dependent functionality should be provided by
+ //! forEach_impl function defined in opencascade::parallel namespace.
//! @param theBegin the first index (incusive)
//! @param theEnd the last index (exclusive)
- //! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}" performing task for specified iterator position
- //! @param isForceSingleThreadExecution if true, then no threads will be created
- template <typename InputIterator, typename Functor>
- static void ForEach( InputIterator theBegin,
- InputIterator theEnd,
- const Functor& theFunctor,
- const Standard_Boolean isForceSingleThreadExecution
- = Standard_False );
+ //! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}"
+ //! performing task for the specified iterator position
+ Standard_EXPORT static void forEach (UniversalIterator& theBegin,
+ UniversalIterator& theEnd,
+ const FunctorInterface& theFunctor);
+
+public: //! @name public methods
+
+ //! Returns number of logical proccesrs.
+ Standard_EXPORT static Standard_Integer NbLogicalProcessors();
- //! Simple primitive for parallelization of "for" loops, e.g.:
+ //! Simple primitive for parallelization of "foreach" loops, equivalent to:
//! @code
- //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
+ //! for (auto anIter = theBegin; anIter != theEnd; ++anIter) {
+ //! theFunctor(*anIter);
+ //! }
//! @endcode
//! @param theBegin the first index (incusive)
//! @param theEnd the last index (exclusive)
- //! @param theFunctor functor providing an interface "void operator(int theIndex){}" performing task for specified index
+ //! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}"
+ //! performing task for specified iterator position
//! @param isForceSingleThreadExecution if true, then no threads will be created
- template <typename Functor>
- static void For( const Standard_Integer theBegin,
- const Standard_Integer theEnd,
- const Functor& theFunctor,
- const Standard_Boolean isForceSingleThreadExecution
- = Standard_False );
-};
-
-//=======================================================================
-//function : OSD_Parallel::Range::It
-//purpose : Template concretization.
-//=======================================================================
-template<> inline Standard_Integer OSD_Parallel::Range<Standard_Integer>::It() const
-{
- return Standard_Atomic_Increment( reinterpret_cast<volatile int*>(&myIt) ) - 1;
-}
-
-//=======================================================================
-//function : ParallelForEach
-//purpose :
-//=======================================================================
-template <typename InputIterator, typename Functor>
-void OSD_Parallel::ForEach( InputIterator theBegin,
- InputIterator theEnd,
- const Functor& theFunctor,
- const Standard_Boolean isForceSingleThreadExecution )
-{
- if ( isForceSingleThreadExecution )
- {
- for ( InputIterator it(theBegin); it != theEnd; ++it )
- theFunctor(*it);
-
- return;
- }
- #ifdef HAVE_TBB
+ template <typename InputIterator, typename Functor>
+ static void ForEach(InputIterator theBegin,
+ InputIterator theEnd,
+ const Functor& theFunctor,
+ const Standard_Boolean isForceSingleThreadExecution = Standard_False)
{
- try
+ if (isForceSingleThreadExecution)
{
- tbb::parallel_for_each(theBegin, theEnd, theFunctor);
+ for (InputIterator it(theBegin); it != theEnd; ++it)
+ theFunctor(*it);
}
- catch ( tbb::captured_exception& anException )
+ else
{
- throw Standard_NotImplemented(anException.what());
+ UniversalIterator aBegin(new IteratorWrapper<InputIterator>(theBegin));
+ UniversalIterator aEnd (new IteratorWrapper<InputIterator>(theEnd));
+ FunctorWrapperIter<InputIterator,Functor> aFunctor (theFunctor);
+ forEach(aBegin, aEnd, aFunctor);
}
}
- #else
- {
- Range<InputIterator> aData(theBegin, theEnd);
- Task<Functor, InputIterator> aTask(theFunctor, aData);
-
- const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
- NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
- for ( Standard_Integer i = 0; i < aNbThreads; ++i )
- {
- OSD_Thread& aThread = aThreads(i);
- aThread.SetFunction(&Task<Functor, InputIterator>::RunWithIterator);
- aThread.Run(&aTask);
- }
-
- for ( Standard_Integer i = 0; i < aNbThreads; ++i )
- aThreads(i).Wait();
- }
- #endif
-}
-
-//=======================================================================
-//function : ParallelFor
-//purpose :
-//=======================================================================
-template <typename Functor>
-void OSD_Parallel::For( const Standard_Integer theBegin,
- const Standard_Integer theEnd,
- const Functor& theFunctor,
- const Standard_Boolean isForceSingleThreadExecution )
-{
- if ( isForceSingleThreadExecution )
- {
- for ( Standard_Integer i = theBegin; i < theEnd; ++i )
- theFunctor(i);
-
- return;
- }
- #ifdef HAVE_TBB
+ //! Simple primitive for parallelization of "for" loops, equivalent to:
+ //! @code
+ //! for (int anIter = theBegin; anIter != theEnd; ++anIter) {
+ //! theFunctor(anIter);
+ //! }
+ //! @endcode
+ //! @param theBegin the first index (incusive)
+ //! @param theEnd the last index (exclusive)
+ //! @param theFunctor functor providing an interface "void operator(int theIndex){}"
+ //! performing task for specified index
+ //! @param isForceSingleThreadExecution if true, then no threads will be created
+ template <typename Functor>
+ static void For(const Standard_Integer theBegin,
+ const Standard_Integer theEnd,
+ const Functor& theFunctor,
+ const Standard_Boolean isForceSingleThreadExecution = Standard_False)
{
- try
+ if (isForceSingleThreadExecution)
{
- tbb::parallel_for( theBegin, theEnd, theFunctor );
+ for (Standard_Integer it (theBegin); it != theEnd; ++it)
+ theFunctor(it);
}
- catch ( tbb::captured_exception& anException )
+ else
{
- throw Standard_NotImplemented(anException.what());
+ UniversalIterator aBegin(new IteratorWrapper<Standard_Integer>(theBegin));
+ UniversalIterator aEnd (new IteratorWrapper<Standard_Integer>(theEnd));
+ FunctorWrapperInt<Functor> aFunctor (theFunctor);
+ forEach(aBegin, aEnd, aFunctor);
}
}
- #else
- {
- Range<Standard_Integer> aData(theBegin, theEnd);
- Task<Functor, Standard_Integer> aTask(theFunctor, aData);
-
- const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
- NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
- for ( Standard_Integer i = 0; i < aNbThreads; ++i )
- {
- OSD_Thread& aThread = aThreads(i);
- aThread.SetFunction(&Task<Functor, Standard_Integer>::RunWithIndex);
- aThread.Run(&aTask);
- }
-
- for ( Standard_Integer i = 0; i < aNbThreads; ++i )
- aThreads(i).Wait();
- }
- #endif
-}
+};
#endif
--- /dev/null
+// Created on: 2014-08-19
+// Created by: Alexander Zaikin
+// Copyright (c) 1996-1999 Matra Datavision
+// Copyright (c) 2013-2014 OPEN CASCADE SAS
+//
+// This file is part of Open CASCADE Technology software library.
+//
+// This library is free software; you can redistribute it and/or modify it under
+// the terms of the GNU Lesser General Public License version 2.1 as published
+// by the Free Software Foundation, with special exception defined in the file
+// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
+// distribution for complete text of the license and disclaimer of any warranty.
+//
+// Alternatively, this file may be used under the terms of Open CASCADE
+// commercial license or contractual agreement.
+
+// Version of parallel executor used when TBB is not available
+#ifndef HAVE_TBB
+
+#include <OSD_Parallel.hxx>
+
+#include <NCollection_Array1.hxx>
+#include <Standard_Mutex.hxx>
+#include <OSD_Thread.hxx>
+
+namespace
+{
+ //! Class implementing tools for parallel processing
+ //! using threads (when TBB is not available);
+ //! it is derived from OSD_Parallel to get access to
+ //! Iterator and FunctorInterface nested types.
+ class OSD_Parallel_Threads : public OSD_Parallel
+ {
+ public:
+ //! Auxiliary class which ensures exclusive
+ //! access to iterators of processed data pool.
+ class Range
+ {
+ public: //! @name public methods
+
+ //! Constructor
+ Range(const OSD_Parallel::UniversalIterator& theBegin,
+ const OSD_Parallel::UniversalIterator& theEnd)
+ : myBegin(theBegin),
+ myEnd(theEnd),
+ myIt(theBegin)
+ {
+ }
+
+ //! Returns const link on the first element.
+ inline const OSD_Parallel::UniversalIterator& Begin() const
+ {
+ return myBegin;
+ }
+
+ //! Returns const link on the last element.
+ inline const OSD_Parallel::UniversalIterator& End() const
+ {
+ return myEnd;
+ }
+
+ //! Returns first non processed element or end.
+ //! Thread-safe method.
+ inline OSD_Parallel::UniversalIterator It() const
+ {
+ Standard_Mutex::Sentry aMutex(myMutex);
+ return (myIt != myEnd) ? myIt++ : myEnd;
+ }
+
+ private: //! @name private methods
+
+ //! Empty copy constructor
+ Range(const Range& theCopy);
+
+ //! Empty copy operator.
+ Range& operator=(const Range& theCopy);
+
+ private: //! @name private fields
+
+ const OSD_Parallel::UniversalIterator& myBegin; //!< Fisrt element of range.
+ const OSD_Parallel::UniversalIterator& myEnd; //!< Last element of range.
+ mutable OSD_Parallel::UniversalIterator myIt; //!< First non processed element of range.
+ mutable Standard_Mutex myMutex; //!< Access controller for the first non processed element.
+ };
+
+ //! Auxiliary wrapper class for thread function.
+ class Task
+ {
+ public: //! @name public methods
+
+ //! Constructor.
+ Task(const OSD_Parallel::FunctorInterface& thePerformer, Range& theRange)
+ : myPerformer(thePerformer),
+ myRange(theRange)
+ {
+ }
+
+ //! Method is executed in the context of thread,
+ //! so this method defines the main calculations.
+ static Standard_Address Run(Standard_Address theTask)
+ {
+ Task& aTask = *(static_cast<Task*>(theTask));
+
+ const Range& aData(aTask.myRange);
+ for (OSD_Parallel::UniversalIterator i = aData.It(); i != aData.End(); i = aData.It())
+ aTask.myPerformer(i);
+
+ return NULL;
+ }
+
+ private: //! @name private methods
+
+ //! Empty copy constructor.
+ Task(const Task& theCopy);
+
+ //! Empty copy operator.
+ Task& operator=(const Task& theCopy);
+
+ private: //! @name private fields
+
+ const OSD_Parallel::FunctorInterface& myPerformer; //!< Link on functor.
+ const Range& myRange; //!< Link on processed data block.
+ };
+ };
+}
+
+//=======================================================================
+//function : forEach
+//purpose :
+//=======================================================================
+void OSD_Parallel::forEach (UniversalIterator& theBegin,
+ UniversalIterator& theEnd,
+ const FunctorInterface& theFunctor)
+{
+ OSD_Parallel_Threads::Range aData(theBegin, theEnd);
+ OSD_Parallel_Threads::Task aTask(theFunctor, aData);
+
+ const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
+ NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
+ for (Standard_Integer i = 0; i < aNbThreads; ++i)
+ {
+ OSD_Thread& aThread = aThreads(i);
+ aThread.SetFunction(&OSD_Parallel_Threads::Task::Run);
+ aThread.Run(&aTask);
+ }
+
+ for (Standard_Integer i = 0; i < aNbThreads; ++i)
+ aThreads(i).Wait();
+}
+
+#endif /* ! HAVE_TBB */
\ No newline at end of file