Skip to content
Snippets Groups Projects
Commit f95badd3 authored by Jeremie Deray's avatar Jeremie Deray
Browse files

wip ProcessingCapturePolicy & thread priority

parent c94114c6
No related branches found
No related tags found
1 merge request!90[WIP] ProcessorBase multi-threading
This commit is part of merge request !90. Comments created here will be created in the context of that merge request.
...@@ -166,6 +166,16 @@ inline void locked_cout(const CoutColor& color, const Args&... rest) ...@@ -166,6 +166,16 @@ inline void locked_cout(const CoutColor& color, const Args&... rest)
#define WOLF_ERROR_COND(...) \ #define WOLF_ERROR_COND(...) \
if (cond) locked_cout(cond, wolf::io::CoutColor::RED, __VA_ARGS__) if (cond) locked_cout(cond, wolf::io::CoutColor::RED, __VA_ARGS__)
// I need to define some custom debugging
// flag (e.g. WOLFDEBUG) that I can pass to
// gcc through cmake.
#if 0
#define WOLF_DEBUG(...)
#else
#define WOLF_DEBUG(...) \
locked_cout(wolf::io::CoutColor::CYAN, __VA_ARGS__)
#endif
} // namespace wolf } // namespace wolf
#endif /* WOLF_MULTI_THREADING_UTILS_H_ */ #endif /* WOLF_MULTI_THREADING_UTILS_H_ */
...@@ -7,13 +7,18 @@ namespace wolf { ...@@ -7,13 +7,18 @@ namespace wolf {
unsigned int ProcessorBase::processor_id_count_ = 0; unsigned int ProcessorBase::processor_id_count_ = 0;
ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type, ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type,
const Scalar& _time_tolerance, const Scalar _processing_rate) : const Scalar& _time_tolerance, const Scalar _processing_rate,
const ProcessingCapturePolicy policy) :
NodeBase("PROCESSOR"), NodeBase("PROCESSOR"),
ThreadedBaseClass(_processing_rate), ThreadedBaseClass(_processing_rate),
sensor_ptr_(), sensor_ptr_(),
processor_id_(++processor_id_count_), processor_id_(++processor_id_count_),
type_id_(_tp), type_id_(_tp),
time_tolerance_(_time_tolerance) time_tolerance_(_time_tolerance),
proc_capture_policy_(policy),
update_(false),
processed_(false),
capturing_(false)
{ {
std::cout << "constructed +p" << id() << std::endl; std::cout << "constructed +p" << id() << std::endl;
......
...@@ -28,6 +28,12 @@ struct ProcessorParamsBase ...@@ -28,6 +28,12 @@ struct ProcessorParamsBase
std::string name; std::string name;
}; };
enum class ProcessingCapturePolicy
{
STRICT,
SOFT
};
//class ProcessorBase //class ProcessorBase
class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public std::enable_shared_from_this<ProcessorBase> class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public std::enable_shared_from_this<ProcessorBase>
{ {
...@@ -36,7 +42,8 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st ...@@ -36,7 +42,8 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st
public: public:
ProcessorBase(ProcessorType _tp, const std::string& _type, ProcessorBase(ProcessorType _tp, const std::string& _type,
const Scalar& _time_tolerance = 0, const Scalar _processing_rate = 2000); const Scalar& _time_tolerance = 0, const Scalar _capturing_rate = 2000,
const ProcessingCapturePolicy policy = ProcessingCapturePolicy::STRICT);
virtual ~ProcessorBase(); virtual ~ProcessorBase();
void remove(); void remove();
...@@ -81,8 +88,13 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st ...@@ -81,8 +88,13 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st
ProcessorType type_id_; ProcessorType type_id_;
Scalar time_tolerance_; ///< self time tolerance for adding a capture into a frame Scalar time_tolerance_; ///< self time tolerance for adding a capture into a frame
ProcessingCapturePolicy proc_capture_policy_;
std::mutex capture_mut_; std::mutex capture_mut_;
std::atomic<bool> update_; std::atomic<bool> update_;
std::atomic<bool> processed_;
std::atomic<bool> capturing_;
CaptureBasePtr current_capture_; CaptureBasePtr current_capture_;
...@@ -98,34 +110,58 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st ...@@ -98,34 +110,58 @@ class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public st
namespace wolf { namespace wolf {
// WOLF_INFO below are their for debugging purpose only
inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr) inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr)
{ {
// wait to lock the mutex WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture wait **************");
std::unique_lock<std::mutex> lock(capture_mut_);
if (!run_.load(std::memory_order_acquire)) std::unique_lock<std::mutex> lock;
switch (proc_capture_policy_)
{ {
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + case ProcessingCapturePolicy::SOFT:
" asked to stop **************\n"); {
return; lock = std::unique_lock<std::mutex>(capture_mut_, std::try_to_lock);
// If try_lock fails, the capture is lost
if (!lock.owns_lock()) return;
}
case ProcessingCapturePolicy::STRICT:
{
// wait to lock the mutex
lock = std::unique_lock<std::mutex>(capture_mut_);
cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
}
default:
{
// wait to lock the mutex
lock = std::unique_lock<std::mutex>(capture_mut_);
cv_.wait(lock, [this]{return !capturing_.load(std::memory_order_acquire) ||
!run_.load(std::memory_order_acquire);});
break;
} }
}
if (!run_.load(std::memory_order_acquire)) return;
capturing_.store(true, std::memory_order_release);
WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " addCapture do **************");
// If a copy id needed, it should // If a copy id needed, it should
// be performed in process(CaptureBasePtr). // be performed in process(CaptureBasePtr).
current_capture_ = _capture_ptr; current_capture_ = _capture_ptr;
lock.unlock();
update_.store(true, std::memory_order_release); update_.store(true, std::memory_order_release);
cv_.notify_one(); cv_.notify_one();
lock.unlock();
} }
inline void ProcessorBase::executeImpl() inline void ProcessorBase::executeImpl()
{ {
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + " executeImpl **************"); WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl wait **************");
// This basically put the thread asleep until someone // This basically put the thread asleep until someone
// sets 'update_' or '!run_' from outside // sets 'update_' or '!run_' from outside
...@@ -135,32 +171,26 @@ inline void ProcessorBase::executeImpl() ...@@ -135,32 +171,26 @@ inline void ProcessorBase::executeImpl()
if (!run_.load(std::memory_order_acquire)) if (!run_.load(std::memory_order_acquire))
{ {
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + // WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) +
" executeImpl asked to stop **************"); // " executeImpl asked to stop **************");
return;
}
else if (!update_.load(std::memory_order_acquire))
{
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
" executeImpl NO update **************");
return; return;
} }
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) + " executeImpl do **************");
" executeImpl actual processing **************\n",
"\t\t\t in thread : ", std::to_string(core::get_thread_id())); capturing_.store(false, std::memory_order_release);
// WOLF_DEBUG("************** ProcessorBase " + std::to_string(id()) +
// " executeImpl actual processing **************\n",
// "\t\t\t in thread : ", std::to_string(core::get_thread_id()));
// Process the capture // Process the capture
process(current_capture_); process(current_capture_);
lock.unlock();
update_.store(false, std::memory_order_release); update_.store(false, std::memory_order_release);
cv_.notify_one(); cv_.notify_one();
lock.unlock();
WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
" executeImpl End **************");
} }
inline wolf::ProblemPtr ProcessorBase::getProblem() inline wolf::ProblemPtr ProcessorBase::getProblem()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment