Skip to content
Snippets Groups Projects

[WIP] ProcessorBase multi-threading

Closed Jeremie Deray requested to merge multi_threading into master
2 files
+ 95
83
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 95
0
@@ -30,6 +30,101 @@ ProcessorBase::~ProcessorBase()
@@ -30,6 +30,101 @@ ProcessorBase::~ProcessorBase()
std::cout << "destructed -p" << id() << std::endl;
std::cout << "destructed -p" << id() << std::endl;
}
}
 
bool ProcessorBase::addCapture(CaptureBasePtr _capture_ptr)
 
{
 
// Defer locking to the appropriate policy
 
std::unique_lock<std::mutex> lock(capture_mut_, std::defer_lock);
 
 
switch (proc_capture_policy_)
 
{
 
case ProcessingCapturePolicy::SOFT:
 
{
 
// Try locking, if fails, the capture is lost
 
// @TODO: try_lock_for(max_period/?)
 
if (!lock.try_lock()) return false;
 
 
break;
 
}
 
case ProcessingCapturePolicy::STRICT:
 
{
 
// block until it can lock
 
lock.lock();
 
 
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
 
std::to_string(processed_.load(std::memory_order_acquire))
 
+ " addCapture wait ****");
 
 
// wait to lock the mutex & data is processed
 
cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) ||
 
!run_.load(std::memory_order_acquire);});
 
break;
 
}
 
default: // is ProcessingCapturePolicy::STRICT
 
{
 
// block until it can lock
 
lock.lock();
 
 
// wait to lock the mutex & data is processed
 
cv_.wait(lock, [this]{return processed_.load(std::memory_order_acquire) ||
 
!run_.load(std::memory_order_acquire);});
 
break;
 
}
 
}
 
 
if (!run_.load(std::memory_order_acquire)) return false;
 
 
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
 
std::to_string(processed_.load(std::memory_order_acquire)) +
 
" addCapture do ****");
 
 
assert(processed_.load(std::memory_order_acquire) && "addCapture processed_");
 
 
assert(current_capture_ != _capture_ptr && "addCapture ptr");
 
 
// Copy the capture
 
// Todo: avoid copy
 
*current_capture_ = *_capture_ptr;
 
 
processed_.store(false, std::memory_order_release);
 
update_.store(true, std::memory_order_release);
 
 
lock.unlock();
 
cv_.notify_one();
 
 
return true;
 
}
 
 
void ProcessorBase::executeImpl()
 
{
 
// This basically put the thread asleep until someone
 
// sets 'update_' or '!run_' from outside
 
std::unique_lock<std::mutex> lock(capture_mut_);
 
 
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
 
std::to_string(update_.load(std::memory_order_acquire)) +
 
" executeImpl wait ****");
 
 
cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) ||
 
!run_.load(std::memory_order_acquire);});
 
 
if (!run_.load(std::memory_order_acquire)) return;
 
 
WOLF_DEBUG("**** ProcessorBase " + std::to_string(id()) + " " +
 
std::to_string(update_.load(std::memory_order_acquire)) +
 
" executeImpl do ****");
 
 
assert(update_.load(std::memory_order_acquire) && "executeImpl update_");
 
 
// Process the capture
 
process(current_capture_);
 
 
processed_.store(true, std::memory_order_release);
 
update_.store(false, std::memory_order_release);
 
 
lock.unlock();
 
cv_.notify_one();
 
}
 
bool ProcessorBase::permittedKeyFrame()
bool ProcessorBase::permittedKeyFrame()
{
{
return getProblem()->permitKeyFrame(shared_from_this());
return getProblem()->permitKeyFrame(shared_from_this());
Loading