Skip to content
Snippets Groups Projects
Commit 785df76a authored by Jeremie Deray's avatar Jeremie Deray
Browse files

mv addCapture/executeImpl to cpp & add latest changes

parent fedeaed4
No related branches found
No related tags found
1 merge request!90[WIP] ProcessorBase multi-threading
...@@ -30,6 +30,101 @@ ProcessorBase::~ProcessorBase() ...@@ -30,6 +30,101 @@ ProcessorBase::~ProcessorBase()
std::cout << "destructed -p" << id() << std::endl; std::cout << "destructed -p" << id() << std::endl;
} }
bool ProcessorBase::addCapture(CaptureBasePtr _capture_ptr)
{
// Defer locking to the appropriate policy
std::unique_lock<std::mutex> lock(capture_mut_, std::defer_lock);
switch (proc_capture_policy_)
{
case ProcessingCapturePolicy::SOFT:
{
// Try locking, if fails, the capture is lost
// @TODO: try_lock_for(max_period/?)
if (!lock.try_lock()) return false;
break;
}
case ProcessingCapturePolicy::STRICT:
{
// block until it can lock
lock.lock();
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
std::to_string(processed_.load(std::memory_order_acquire))
+ " addCapture wait ****");
// wait to lock the mutex & data is processed
cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
}
default: // is ProcessingCapturePolicy::STRICT
{
// block until it can lock
lock.lock();
// wait to lock the mutex & data is processed
cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
}
}
if (!run_.load(std::memory_order_acquire)) return false;
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
std::to_string(processed_.load(std::memory_order_acquire)) +
" addCapture do ****");
assert(processed_.load(std::memory_order_acquire) && "addCapture processed_");
assert(current_capture_ != _capture_ptr && "addCapture ptr");
// Copy the capture
// Todo: avoid copy
*current_capture_ = *_capture_ptr;
processed_.store(false, std::memory_order_release);
update_.store(true, std::memory_order_release);
lock.unlock();
cv_.notify_one();
return true;
}
void ProcessorBase::executeImpl()
{
// This basically put the thread asleep until someone
// sets 'update_' or '!run_' from outside
std::unique_lock<std::mutex> lock(capture_mut_);
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
std::to_string(update_.load(std::memory_order_acquire)) +
" executeImpl wait ****");
cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
if (!run_.load(std::memory_order_acquire)) return;
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
std::to_string(update_.load(std::memory_order_acquire)) +
" executeImpl do ****");
assert(update_.load(std::memory_order_acquire) && "executeImpl update_");
// Process the capture
process(current_capture_);
processed_.store(true, std::memory_order_release);
update_.store(false, std::memory_order_release);
lock.unlock();
cv_.notify_one();
}
bool ProcessorBase::permittedKeyFrame() bool ProcessorBase::permittedKeyFrame()
{ {
return getProblem()->permitKeyFrame(shared_from_this()); return getProblem()->permitKeyFrame(shared_from_this());
......
...@@ -110,89 +110,6 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st ...@@ -110,89 +110,6 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st
namespace wolf { namespace wolf {
inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr)
{
WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture wait **************");
std::unique_lock<std::mutex> lock;
switch (proc_capture_policy_)
{
case ProcessingCapturePolicy::SOFT:
{
lock = std::unique_lock<std::mutex>(capture_mut_, std::try_to_lock);
// If try_lock fails, the capture is lost
if (!lock.owns_lock()) return;
}
case ProcessingCapturePolicy::STRICT:
{
// wait to lock the mutex
lock = std::unique_lock<std::mutex>(capture_mut_);
cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
}
default:
{
// wait to lock the mutex
lock = std::unique_lock<std::mutex>(capture_mut_);
cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
}
}
if (!run_.load(std::memory_order_acquire)) return;
capturing_.store(true, std::memory_order_release);
WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture do **************");
// If a copy id needed, it should
// be performed in process(CaptureBasePtr).
current_capture_ = _capture_ptr;
update_.store(true, std::memory_order_release);
cv_.notify_one();
lock.unlock();
}
inline void ProcessorBase::executeImpl()
{
WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl wait **************");
// This basically put the thread asleep until someone
// sets 'update_' or '!run_' from outside
std::unique_lock<std::mutex> lock(capture_mut_);
cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
if (!run_.load(std::memory_order_acquire))
{
// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) +
// " executeImpl asked to stop **************");
return;
}
WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl do **************");
capturing_.store(false, std::memory_order_release);
// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) +
// " executeImpl actual processing **************\n",
// "\t\t\t in thread : ", std::to_string(core::get_thread_id()));
// Process the capture
process(current_capture_);
update_.store(false, std::memory_order_release);
cv_.notify_one();
lock.unlock();
}
inline wolf::ProblemPtr ProcessorBase::getProblem() inline wolf::ProblemPtr ProcessorBase::getProblem()
{ {
ProblemPtr prb = problem_ptr_.lock(); ProblemPtr prb = problem_ptr_.lock();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment