diff --git a/src/multi_threading_utils.h b/src/multi_threading_utils.h index eaadf6d4b03ff3dc9f7ccdd65412b5aba671a95b..61c2f93ea7b66376d89a7ed8195e45cc117a9780 100644 --- a/src/multi_threading_utils.h +++ b/src/multi_threading_utils.h @@ -166,6 +166,16 @@ inline void locked_cout(const CoutColor& color, const Args&... rest) #define WOLF_ERROR_COND(...) \ if (cond) locked_cout(cond, wolf::io::CoutColor::RED, __VA_ARGS__) +// I need to define some custom debugging +// flag (e.g. WOLFDEBUG) that I can pass to +// gcc through cmake. +#if 0 + #define WOLF_DEBUG(...) +#else + #define WOLF_DEBUG(...) \ + locked_cout(wolf::io::CoutColor::CYAN, __VA_ARGS__) +#endif + } // namespace wolf #endif /* WOLF_MULTI_THREADING_UTILS_H_ */ diff --git a/src/processor_base.cpp b/src/processor_base.cpp index 8eb098bad638118a04aa3dad1cef58f58d8c8e56..3d6ccab48f1334da6e305af90c7cac923d309567 100644 --- a/src/processor_base.cpp +++ b/src/processor_base.cpp @@ -7,13 +7,18 @@ namespace wolf { unsigned int ProcessorBase::processor_id_count_ = 0; ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type, - const Scalar& _time_tolerance, const Scalar _processing_rate) : + const Scalar& _time_tolerance, const Scalar _processing_rate, + const ProcessingCapturePolicy policy) : NodeBase("PROCESSOR"), ThreadedBaseClass(_processing_rate), sensor_ptr_(), processor_id_(++processor_id_count_), type_id_(_tp), - time_tolerance_(_time_tolerance) + time_tolerance_(_time_tolerance), + proc_capture_policy_(policy), + update_(false), + processed_(false), + capturing_(false) { std::cout << "constructed +p" << id() << std::endl; diff --git a/src/processor_base.h b/src/processor_base.h index 9931d333d581ef40fa08489732bc9465705abc6d..6095401fa110cbfa81b9ef2218c4a545784836f1 100644 --- a/src/processor_base.h +++ b/src/processor_base.h @@ -28,6 +28,12 @@ struct ProcessorParamsBase std::string name; }; +enum class ProcessingCapturePolicy +{ + STRICT, + SOFT +}; + //class ProcessorBase class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public std::enable_shared_from_this<ProcessorBase> { @@ -36,7 +42,8 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st public: ProcessorBase(ProcessorType _tp, const std::string& _type, - const Scalar& _time_tolerance = 0, const Scalar _processing_rate = 2000); + const Scalar& _time_tolerance = 0, const Scalar _capturing_rate = 2000, + const ProcessingCapturePolicy policy = ProcessingCapturePolicy::STRICT); virtual ~ProcessorBase(); void remove(); @@ -81,8 +88,13 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st ProcessorType type_id_; Scalar time_tolerance_; ///< self time tolerance for adding a capture into a frame + ProcessingCapturePolicy proc_capture_policy_; + std::mutex capture_mut_; std::atomic<bool> update_; + std::atomic<bool> processed_; + + std::atomic<bool> capturing_; CaptureBasePtr current_capture_; @@ -98,34 +110,58 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st namespace wolf { -// WOLF_INFO below are their for debugging purpose only - inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr) { - // wait to lock the mutex - std::unique_lock<std::mutex> lock(capture_mut_); + WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture wait **************"); - if (!run_.load(std::memory_order_acquire)) + std::unique_lock<std::mutex> lock; + + switch (proc_capture_policy_) { - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + - " asked to stop **************\n"); - return; + case ProcessingCapturePolicy::SOFT: + { + lock = std::unique_lock<std::mutex>(capture_mut_, std::try_to_lock); + + // If try_lock fails, the capture is lost + if (!lock.owns_lock()) return; + } + case ProcessingCapturePolicy::STRICT: + { + // wait to lock the mutex + lock = std::unique_lock<std::mutex>(capture_mut_); + cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + break; + } + default: + { + // wait to lock the mutex + lock = std::unique_lock<std::mutex>(capture_mut_); + cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + break; } + } + + if (!run_.load(std::memory_order_acquire)) return; + + capturing_.store(true, std::memory_order_release); + + WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture do **************"); // If a copy id needed, it should // be performed in process(CaptureBasePtr). current_capture_ = _capture_ptr; - lock.unlock(); - update_.store(true, std::memory_order_release); cv_.notify_one(); + lock.unlock(); } inline void ProcessorBase::executeImpl() { - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + " executeImpl **************"); + WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl wait **************"); // This basically put the thread asleep until someone // sets 'update_' or '!run_' from outside @@ -135,32 +171,26 @@ inline void ProcessorBase::executeImpl() if (!run_.load(std::memory_order_acquire)) { - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + - " executeImpl asked to stop **************"); - return; - } - else if (!update_.load(std::memory_order_acquire)) - { - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + - " executeImpl NO update **************"); +// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + +// " executeImpl asked to stop **************"); return; } - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + - " executeImpl actual processing **************\n", - "\t\t\t in thread : ", std::to_string(core::get_thread_id())); + WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl do **************"); + + capturing_.store(false, std::memory_order_release); + +// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + +// " executeImpl actual processing **************\n", +// "\t\t\t in thread : ", std::to_string(core::get_thread_id())); // Process the capture process(current_capture_); - lock.unlock(); - update_.store(false, std::memory_order_release); cv_.notify_one(); - - WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + - " executeImpl End **************"); + lock.unlock(); } inline wolf::ProblemPtr ProcessorBase::getProblem()