From db35434082bf4fe8c123bb5cf6c9d11f5362a569 Mon Sep 17 00:00:00 2001
From: Jeremie Deray <jeremie.deray@pal-robotics.com>
Date: Mon, 10 Oct 2016 18:44:03 +0200
Subject: [PATCH] wip PocessorBase as a ThreadedBaseClass

---
 src/CMakeLists.txt     |  3 ++
 src/processor_base.cpp |  4 +-
 src/processor_base.h   | 84 +++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3ced6ee42..b6e550f92 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -183,6 +183,8 @@ SET(HDRS
     state_quaternion.h
     time_stamp.h
     trajectory_base.h
+    threaded_base_class.h
+    multi_threading_utils.h
     )
 
 
@@ -247,6 +249,7 @@ SET(SRCS
     sensor_odom_3D.cpp
     time_stamp.cpp
     trajectory_base.cpp
+    threaded_base_class.cpp
     data_association/association_solver.cpp
     data_association/association_node.cpp
     data_association/association_tree.cpp
diff --git a/src/processor_base.cpp b/src/processor_base.cpp
index beb4018b1..8eb098bad 100644
--- a/src/processor_base.cpp
+++ b/src/processor_base.cpp
@@ -6,8 +6,10 @@ namespace wolf {
 
 unsigned int ProcessorBase::processor_id_count_ = 0;
 
-ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type, const Scalar& _time_tolerance) :
+ProcessorBase::ProcessorBase(ProcessorType _tp, const std::string& _type,
+                             const Scalar& _time_tolerance, const Scalar _processing_rate) :
         NodeBase("PROCESSOR"),
+        ThreadedBaseClass(_processing_rate),
         sensor_ptr_(),
         processor_id_(++processor_id_count_),
         type_id_(_tp),
diff --git a/src/processor_base.h b/src/processor_base.h
index 5900ed583..9931d333d 100644
--- a/src/processor_base.h
+++ b/src/processor_base.h
@@ -9,6 +9,7 @@ class SensorBase;
 //Wolf includes
 #include "wolf.h"
 #include "node_base.h"
+#include "threaded_base_class.h"
 
 // std
 #include <memory>
@@ -28,12 +29,15 @@ struct ProcessorParamsBase
 };
 
 //class ProcessorBase
-class ProcessorBase : public NodeBase, public std::enable_shared_from_this<ProcessorBase>
+class ProcessorBase : public NodeBase, public core::ThreadedBaseClass, public std::enable_shared_from_this<ProcessorBase>
 {
     private:
         SensorBaseWPtr sensor_ptr_;
     public:
-        ProcessorBase(ProcessorType _tp, const std::string& _type, const Scalar& _time_tolerance = 0);
+
+        ProcessorBase(ProcessorType _tp, const std::string& _type,
+                      const Scalar& _time_tolerance = 0, const Scalar _processing_rate = 2000);
+
         virtual ~ProcessorBase();
         void remove();
 
@@ -41,6 +45,8 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce
 
         virtual void process(CaptureBasePtr _capture_ptr) = 0;
 
+        virtual void addCapture(CaptureBasePtr _capture_ptr);
+
         /** \brief Vote for KeyFrame generation
          *
          * If a KeyFrame criterion is validated, this function returns true,
@@ -74,6 +80,13 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce
         unsigned int processor_id_;
         ProcessorType type_id_;
         Scalar time_tolerance_;         ///< self time tolerance for adding a capture into a frame
+
+        std::mutex capture_mut_;
+        std::atomic<bool> update_;
+
+        CaptureBasePtr current_capture_;
+
+        virtual void executeImpl();
 };
 
 }
@@ -81,8 +94,75 @@ class ProcessorBase : public NodeBase, public std::enable_shared_from_this<Proce
 #include "sensor_base.h"
 #include "constraint_base.h"
 
+#include "multi_threading_utils.h"
+
 namespace wolf {
 
+// WOLF_INFO below are their for debugging purpose only
+
+inline void ProcessorBase::addCapture(CaptureBasePtr _capture_ptr)
+{
+  // wait to lock the mutex
+  std::unique_lock<std::mutex> lock(capture_mut_);
+
+  if (!run_.load(std::memory_order_acquire))
+  {
+    WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
+              " asked to stop **************\n");
+    return;
+  }
+
+  // If a copy id needed, it should
+  //  be performed in process(CaptureBasePtr).
+  current_capture_ = _capture_ptr;
+
+  lock.unlock();
+
+  update_.store(true, std::memory_order_release);
+
+  cv_.notify_one();
+}
+
+inline void ProcessorBase::executeImpl()
+{
+  WOLF_INFO("************** ProcessorBase " + std::to_string(id()) + " executeImpl **************");
+
+  // This basically put the thread asleep until someone
+  // sets 'update_' or '!run_' from outside
+  std::unique_lock<std::mutex> lock(capture_mut_);
+  cv_.wait(lock, [this]{return update_.load(std::memory_order_acquire) ||
+                               !run_.load(std::memory_order_acquire);});
+
+  if (!run_.load(std::memory_order_acquire))
+  {
+    WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
+                             " executeImpl asked to stop **************");
+    return;
+  }
+  else if (!update_.load(std::memory_order_acquire))
+  {
+    WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
+                             " executeImpl NO update **************");
+    return;
+  }
+
+  WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
+                           " executeImpl actual processing **************\n",
+                           "\t\t\t in thread : ", std::to_string(core::get_thread_id()));
+
+  // Process the capture
+  process(current_capture_);
+
+  lock.unlock();
+
+  update_.store(false, std::memory_order_release);
+
+  cv_.notify_one();
+
+  WOLF_INFO("************** ProcessorBase " + std::to_string(id()) +
+            " executeImpl End **************");
+}
+
 inline wolf::ProblemPtr ProcessorBase::getProblem()
 {
     ProblemPtr prb = problem_ptr_.lock();
-- 
GitLab