diff --git a/src/processor_base.cpp b/src/processor_base.cpp index 3d6ccab48f1334da6e305af90c7cac923d309567..1f268c8ab9d2a05ff871a02da5f6306c44249e3f 100644 --- a/src/processor_base.cpp +++ b/src/processor_base.cpp @@ -30,6 +30,101 @@ ProcessorBase::~ProcessorBase() std::cout << "destructed -p" << id() << std::endl; } +bool ProcessorBase::addCapture(CaptureBasePtr _capture_ptr) +{ + // Defer locking to the appropriate policy + std::unique_lock<std::mutex> lock(capture_mut_, std::defer_lock); + + switch (proc_capture_policy_) + { + case ProcessingCapturePolicy::SOFT: + { + // Try locking, if fails, the capture is lost + // @TODO: try_lock_for(max_period/?) + if (!lock.try_lock()) return false; + + break; + } + case ProcessingCapturePolicy::STRICT: + { + // block until it can lock + lock.lock(); + + WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " + + std::to_string(processed_.load(std::memory_order_acquire)) + + " addCapture wait ****"); + + // wait to lock the mutex & data is processed + cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + break; + } + default: // is ProcessingCapturePolicy::STRICT + { + // block until it can lock + lock.lock(); + + // wait to lock the mutex & data is processed + cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + break; + } + } + + if (!run_.load(std::memory_order_acquire)) return false; + + WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " + + std::to_string(processed_.load(std::memory_order_acquire)) + + " addCapture do ****"); + + assert(processed_.load(std::memory_order_acquire) && "addCapture processed_"); + + assert(current_capture_ != _capture_ptr && "addCapture ptr"); + + // Copy the capture + // Todo: avoid copy + *current_capture_ = *_capture_ptr; + + processed_.store(false, std::memory_order_release); + update_.store(true, std::memory_order_release); + + lock.unlock(); + cv_.notify_one(); + + return true; +} + +void ProcessorBase::executeImpl() +{ + // This basically put the thread asleep until someone + // sets 'update_' or '!run_' from outside + std::unique_lock<std::mutex> lock(capture_mut_); + + WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " + + std::to_string(update_.load(std::memory_order_acquire)) + + " executeImpl wait ****"); + + cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) || + !run_.load(std::memory_order_acquire);}); + + if (!run_.load(std::memory_order_acquire)) return; + + WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " + + std::to_string(update_.load(std::memory_order_acquire)) + + " executeImpl do ****"); + + assert(update_.load(std::memory_order_acquire) && "executeImpl update_"); + + // Process the capture + process(current_capture_); + + processed_.store(true, std::memory_order_release); + update_.store(false, std::memory_order_release); + + lock.unlock(); + cv_.notify_one(); +} + bool ProcessorBase::permittedKeyFrame() { return getProblem()->permitKeyFrame(shared_from_this()); diff --git a/src/processor_base.h b/src/processor_base.h index 6095401fa110cbfa81b9ef2218c4a545784836f1..9b3accf0371e6b6fee42d4321194ba3d81eeb7a3 100644 --- a/src/processor_base.h +++ b/src/processor_base.h @@ -110,89 +110,6 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st namespace wolf { -inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr) -{ - WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture wait **************"); - - std::unique_lock<std::mutex> lock; - - switch (proc_capture_policy_) - { - case ProcessingCapturePolicy::SOFT: - { - lock = std::unique_lock<std::mutex>(capture_mut_, std::try_to_lock); - - // If try_lock fails, the capture is lost - if (!lock.owns_lock()) return; - } - case ProcessingCapturePolicy::STRICT: - { - // wait to lock the mutex - lock = std::unique_lock<std::mutex>(capture_mut_); - cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) || - !run_.load(std::memory_order_acquire);}); - break; - } - default: - { - // wait to lock the mutex - lock = std::unique_lock<std::mutex>(capture_mut_); - cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) || - !run_.load(std::memory_order_acquire);}); - break; - } - } - - if (!run_.load(std::memory_order_acquire)) return; - - capturing_.store(true, std::memory_order_release); - - WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture do **************"); - - // If a copy id needed, it should - // be performed in process(CaptureBasePtr). - current_capture_ = _capture_ptr; - - update_.store(true, std::memory_order_release); - - cv_.notify_one(); - lock.unlock(); -} - -inline void ProcessorBase::executeImpl() -{ - WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl wait **************"); - - // This basically put the thread asleep until someone - // sets 'update_' or '!run_' from outside - std::unique_lock<std::mutex> lock(capture_mut_); - cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) || - !run_.load(std::memory_order_acquire);}); - - if (!run_.load(std::memory_order_acquire)) - { -// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + -// " executeImpl asked to stop **************"); - return; - } - - WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl do **************"); - - capturing_.store(false, std::memory_order_release); - -// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + -// " executeImpl actual processing **************\n", -// "\t\t\t in thread : ", std::to_string(core::get_thread_id())); - - // Process the capture - process(current_capture_); - - update_.store(false, std::memory_order_release); - - cv_.notify_one(); - lock.unlock(); -} - inline wolf::ProblemPtr ProcessorBase::getProblem() { ProblemPtr prb = problem_ptr_.lock();