diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3ced6ee426efb16e8a078e50344337c79ba7b72b..b6e550f926044bd44b1f5b63f8e1335b375e7656 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -183,6 +183,8 @@ SET(HDRS state_quaternion.h time_stamp.h trajectory_base.h + threaded_base_class.h + multi_threading_utils.h ) @@ -247,6 +249,7 @@ SET(SRCS sensor_odom_3D.cpp time_stamp.cpp trajectory_base.cpp + threaded_base_class.cpp data_association/association_solver.cpp data_association/association_node.cpp data_association/association_tree.cpp diff --git a/src/processor_base.cpp b/src/processor_base.cpp index beb4018b1931a7d0d99deca4155408c40c49d396..8eb098bad638118a04aa3dad1cef58f58d8c8e56 100644 --- a/src/processor_base.cpp +++ b/src/processor_base.cpp @@ -6,8 +6,10 @@ namespace wolf { unsigned int ProcessorBase::processor_id_count_ = 0; -ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type, const Scalar& _time_tolerance) : +ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type, + const Scalar& _time_tolerance, const Scalar _processing_rate) : NodeBase("PROCESSOR"), + ThreadedBaseClass(_processing_rate), sensor_ptr_(), processor_id_(++processor_id_count_), type_id_(_tp), diff --git a/src/processor_base.h b/src/processor_base.h index 5900ed583fba3d5911e85d736933704ee330d628..9931d333d581ef40fa08489732bc9465705abc6d 100644 --- a/src/processor_base.h +++ b/src/processor_base.h @@ -9,6 +9,7 @@ class SensorBase; //Wolf includes #include "wolf.h" #include "node_base.h" +#include "threaded_base_class.h" // std #include <memory> @@ -28,12 +29,15 @@ struct ProcessorParamsBase }; //class ProcessorBase -class ProcessorBase : public NodeBase, public std::enable_shared_from_this<ProcessorBase> +class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public std::enable_shared_from_this<ProcessorBase> { private: SensorBaseWPtr sensor_ptr_; public: - ProcessorBase(ProcessorType _tp, const std::string& _type, const Scalar& _time_tolerance = 0); + + ProcessorBase(ProcessorType _tp, const std::string& _type, + const Scalar& _time_tolerance = 0, const Scalar _processing_rate = 2000); + virtual ~ProcessorBase(); void remove(); @@ -41,6 +45,8 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce virtual void process(CaptureBasePtr _capture_ptr) = 0; + virtual void addCapture(CaptureBasePtr _capture_ptr); + /** \brief Vote for KeyFrame generation * * If a KeyFrame criterion is validated, this function returns true, @@ -74,6 +80,13 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce unsigned int processor_id_; ProcessorType type_id_; Scalar time_tolerance_; ///< self time tolerance for adding a capture into a frame + + std::mutex capture_mut_; + std::atomic<bool> update_; + + CaptureBasePtr current_capture_; + + virtual void executeImpl(); }; } @@ -81,8 +94,75 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce #include "sensor_base.h" #include "constraint_base.h" +#include "multi_threading_utils.h" + namespace wolf { +// WOLF_INFO below are their for debugging purpose only + +inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr) +{ + // wait to lock the mutex + std::unique_lock<std::mutex> lock(capture_mut_); + + if (!run_.load(std::memory_order_acquire)) + { + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + + " asked to stop **************\n"); + return; + } + + // If a copy id needed, it should + // be performed in process(CaptureBasePtr). + current_capture_ = _capture_ptr; + + lock.unlock(); + + update_.store(true, std::memory_order_release); + + cv_.notify_one(); +} + +inline void ProcessorBase::executeImpl() +{ + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + " executeImpl **************"); + + // This basically put the thread asleep until someone + // sets 'update_' or '!run_' from outside + std::unique_lock<std::mutex> lock(capture_mut_); + cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + + if (!run_.load(std::memory_order_acquire)) + { + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + + " executeImpl asked to stop **************"); + return; + } + else if (!update_.load(std::memory_order_acquire)) + { + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + + " executeImpl NO update **************"); + return; + } + + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + + " executeImpl actual processing **************\n", + "\t\t\t in thread : ", std::to_string(core::get_thread_id())); + + // Process the capture + process(current_capture_); + + lock.unlock(); + + update_.store(false, std::memory_order_release); + + cv_.notify_one(); + + WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + + " executeImpl End **************"); +} + inline wolf::ProblemPtr ProcessorBase::getProblem() { ProblemPtr prb = problem_ptr_.lock();