Skip to content

Commit

Permalink
Reverted filter watchdog model to requested/actual state
Browse files Browse the repository at this point in the history
  • Loading branch information
ztarem committed Dec 11, 2018
1 parent 0208599 commit 11b538c
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 115 deletions.
6 changes: 3 additions & 3 deletions src/filters/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int mc_FilterEngine::Destroy()
}
if (m_watchdog != nullptr)
{
m_watchdog->PostPoisonPill();
m_watchdog->Shutdown();
delete m_watchdog;
}

Expand Down Expand Up @@ -159,12 +159,12 @@ void mc_FilterEngine::SetRunningFilter(const mc_Filter *filter)
{
m_watchdog = new Watchdog(boost::bind(&mc_FilterEngine::TerminateFilter, this, _1));
}
m_watchdog->PostTaskStarted(filter->Timeout());
m_watchdog->FilterStarted(filter->Timeout());
}

void mc_FilterEngine::ResetRunningFilter()
{
assert(m_watchdog != nullptr);
m_runningFilter = nullptr;
m_watchdog->PostTaskEnded();
m_watchdog->FilterEnded();
}
203 changes: 122 additions & 81 deletions src/filters/watchdog.cpp
Original file line number Diff line number Diff line change
@@ -1,132 +1,173 @@
#include "filters/watchdog.h"
#include "watchdog.h"
#include "core/init.h"
//#include "protocol/filter.h"
#include "utils/define.h"
#include "utils/util.h"

Watchdog::Watchdog(std::function<void(const char *)> taskTerminator)
: m_thread(nullptr), m_state(Watchdog::State::IDLE), m_timeout(0), m_taskTerminator(taskTerminator)
void WatchdogState::Set(WatchdogState::State state)
{
m_thread = new boost::thread(&Watchdog::EventLoop, this);
if (m_state != state)
{
{
boost::lock_guard<boost::mutex> lock(m_mutex);
m_state = state;
}
m_condVar.notify_all();
}
}

Watchdog::~Watchdog()
std::string WatchdogState::Str() const
{
if (m_thread != nullptr)
std::string stateStr;
switch (m_state)
{
this->PostPoisonPill();
case State::INIT:
stateStr = "INIT";
break;
case State::IDLE:
stateStr = "IDLE";
break;
case State::RUNNING:
stateStr = "RUNNING";
break;
case State::POISON_PILL:
stateStr = "POISON_PILL";
break;
}
return tfm::format("%s %s", m_name, stateStr);
}

void Watchdog::PostTaskStarted(int timeout)
void WatchdogState::WaitState(WatchdogState::State state)
{
if (fDebug)
LogPrint("v8", "Watchdog::PostStarted(timeout=%d)\n", timeout);

m_timeout.store(timeout);
this->PostEvent(Event::TASK_STARTED);
if (m_state != state)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_condVar.wait(lock, [this, state] { return m_state == state; });
}
}

void Watchdog::PostTaskEnded()
void WatchdogState::WaitNotState(WatchdogState::State state)
{
if (fDebug)
LogPrint("v8", "Watchdog::PostEnded()\n");
if (m_state == state)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_condVar.wait(lock, [this, state] { return m_state != state; });
}
}

m_timeout.store(0);
this->PostEvent(Event::TASK_ENDED);
template <class Rep, class Period>
bool WatchdogState::WaitStateFor(WatchdogState::State state, const boost::chrono::duration<Rep, Period> &timeout)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_cv.wait(lock, [this] { return m_state.load() != State::TASK_RUNNING; });
return m_condVar.wait_for(lock, timeout, [this, state] { return m_state == state; });
}

void Watchdog::PostPoisonPill()
template <class Rep, class Period>
bool WatchdogState::WaitNotStateFor(WatchdogState::State state, const boost::chrono::duration<Rep, Period> &timeout)
{
if (fDebug)
LogPrint("v8", "Watchdog::PostPoison()\n");
boost::unique_lock<boost::mutex> lock(m_mutex);
return m_condVar.wait_for(lock, timeout, [this, state] { return m_state != state; });
}

this->PostEvent(Event::POISON_PILL);
m_thread->join();
delete m_thread;
void Watchdog::Watchdog::Zero()
{
m_thread = nullptr;
m_timeout = 0;
}

std::string Watchdog::EventStr(Watchdog::Event event)
int Watchdog::Destroy()
{
std::string eventStr;
switch (event)
if (m_thread != nullptr)
{
case Event::TASK_STARTED:
eventStr = "task started";
break;
case Event::TASK_ENDED:
eventStr = "task ended";
break;
case Event::POISON_PILL:
eventStr = "take poison";
break;
m_thread->interrupt();
m_thread->join();
delete m_thread;
}
return eventStr;
this->Zero();
return MC_ERR_NOERROR;
}

void Watchdog::PostEvent(Watchdog::Event event)
void Watchdog::FilterStarted(int timeout)
{
if (fDebug)
LogPrint("v8", "Watchdog::PostEvent(event=%s)\n", EventStr(event));

LogPrint("", ": Watchdog::FilterStarted(timeout=%d) %s\n", timeout, m_actualState.Str());
if (m_thread == nullptr)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
m_queue.push(event);
if (fDebug)
LogPrint("", ": Watchdog::FilterStarted create thread with watchdogTask\n");
m_thread = new boost::thread(std::bind(&Watchdog::WatchdogTask, this));
}
m_cv.notify_one();
m_actualState.WaitState(WatchdogState::State::IDLE);
m_timeout = timeout;
m_requestedState.Set(WatchdogState::State::RUNNING);
m_actualState.WaitState(WatchdogState::State::RUNNING);
}

void Watchdog::FilterEnded()
{
if (fDebug)
LogPrint("", ": Watchdog::FilterEnded %s\n", m_actualState.Str());
m_actualState.WaitState(WatchdogState::State::RUNNING);
m_requestedState.Set(WatchdogState::State::IDLE);
m_actualState.WaitState(WatchdogState::State::IDLE);
}

void Watchdog::EventLoop()
void Watchdog::Shutdown()
{
Event event;
if (fDebug)
LogPrint("", ": Watchdog::Shutdown\n");
m_requestedState.Set(WatchdogState::State::POISON_PILL);
m_actualState.WaitState(WatchdogState::State::POISON_PILL);
this->Destroy();
}

void Watchdog::WatchdogTask()
{
if (fDebug)
LogPrint("", ": Watchdog::watchdogTask\n");
while (true)
{
if (fDebug)
LogPrint("v8", "Watchdog::EventLoop(): wait for queue\n");
std::string msg = tfm::format(": Watchdog::watchdogTask %s - %%s", m_requestedState.Str());
switch (m_requestedState.Get())
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_cv.wait(lock, [this] { return !m_queue.empty(); });
}
{
boost::lock_guard<boost::mutex> lock(m_mutex);
event = m_queue.pull();
}

switch (event)
{
case Event::TASK_STARTED:
case WatchdogState::State::POISON_PILL:
if (fDebug)
LogPrint("v8", "Watchdog::EventLoop(): task started timeout=%d\n", m_timeout.load());
m_state.store(State::TASK_RUNNING);
LogPrint("", msg.c_str(), "committing suicide\n");
m_actualState.Set(WatchdogState::State::POISON_PILL);
return;

case WatchdogState::State::RUNNING:
m_actualState.Set(WatchdogState::State::RUNNING);
if (m_timeout > 0)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
if (m_timeout.load() > 0)
if (fDebug)
LogPrint("", msg.c_str(), "entering timed wait\n");
bool finished = m_requestedState.WaitNotStateFor(WatchdogState::State::RUNNING,
boost::chrono::milliseconds(m_timeout));
if (!finished)
{
if (fDebug)
LogPrint("v8", "Watchdog::EventLoop(): timed wait\n");
if (m_cv.wait_for(lock, boost::chrono::milliseconds(m_timeout.load())) == boost::cv_status::timeout)
{
if (fDebug)
LogPrint("v8", "Watchdog::EvenTLoop(): timeout\n");
m_state.store(State::TASK_TIMED_OUT);
m_taskTerminator(
tinyformat::format("Filter aborted due to timeout after %d ms", m_timeout).c_str());
}
LogPrint("", msg.c_str(), "timeout -> terminating filter\n");
m_taskTerminator(tfm::format("Filter aborted due to timeout after %d ms", m_timeout).c_str());
m_requestedState.WaitNotState(WatchdogState::State::RUNNING);
}
}
else
{
if (fDebug)
LogPrint("", msg.c_str(), "entering inifinte wait\n");
m_requestedState.WaitNotState(WatchdogState::State::RUNNING);
}
break;

case Event::TASK_ENDED:
case WatchdogState::State::INIT:
// fall through
case WatchdogState::State::IDLE:
if (fDebug)
LogPrint("v8", "Watchdog::EventLoop(): task ended\n");
m_state.store(State::IDLE);
m_cv.notify_one();
LogPrint("", msg.c_str(), "entering idle state\n");
m_actualState.Set(WatchdogState::State::IDLE);
m_requestedState.WaitNotState(WatchdogState::State::IDLE);
break;

case Event::POISON_PILL:
if (fDebug)
LogPrint("v8", "Watchdog::EvenTLoop(): swallowed poison pill\n");
return;
}
}
}
97 changes: 66 additions & 31 deletions src/filters/watchdog.h
Original file line number Diff line number Diff line change
@@ -1,49 +1,84 @@
#ifndef WATCHDOG_H
#define WATCHDOG_H

#include <boost/thread/condition_variable.hpp>
#include <boost/thread/sync_queue.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <boost/thread/thread.hpp>
#include <functional>
#include <atomic>
#include <boost/thread.hpp>

class Watchdog
class WatchdogState
{
public:
Watchdog(std::function<void(const char *)> taskTerminator);
~Watchdog();

void PostTaskStarted(int timeout = 0);
void PostTaskEnded();
void PostPoisonPill();

private:
enum Event
enum State
{
TASK_STARTED,
TASK_ENDED,
INIT,
IDLE,
RUNNING,
POISON_PILL
};

enum State
WatchdogState(std::string name, State state = State::INIT) : m_name(name), m_state(state)
{
IDLE,
TASK_RUNNING,
TASK_TIMED_OUT
};
}

State Get() const
{
return m_state;
}
void Set(State state);
std::string Str() const;

void WaitState(State state);
void WaitNotState(State state);
template <class Rep, class Period>
bool WaitStateFor(State state, const boost::chrono::duration<Rep, Period> &timeout);
template <class Rep, class Period>
bool WaitNotStateFor(State state, const boost::chrono::duration<Rep, Period> &timeout);

boost::sync_queue<Event> m_queue;
protected:
std::string m_name;
boost::condition_variable m_condVar;
boost::mutex m_mutex;
boost::condition_variable m_cv;
State m_state;
};

class Watchdog
{
public:
Watchdog(std::function<void(const char *)> taskTerminator) : m_taskTerminator(taskTerminator)
{
this->Zero();
}

~Watchdog()
{
this->Destroy();
}

void Zero();
int Destroy();

/**
* @brief Notfies the watchdog that a filter started runnug, with a given timeout.
* @param timeout The number of millisecond to allow the filtr to run.
*/
void FilterStarted(int timeout);

/**
* @brief Notfies the watchdog that a filter stopped running.
*/
void FilterEnded();

/**
* @brief Terminate the watchdog.
*/
void Shutdown();

private:
boost::thread *m_thread;
std::atomic<State> m_state;
std::atomic_int m_timeout;
int m_timeout;
WatchdogState m_requestedState{"requested"};
WatchdogState m_actualState{"actual"};
std::function<void(const char *)> m_taskTerminator;

static std::string EventStr(Event event);
void PostEvent(Event event);
void EventLoop();
void WatchdogTask();
};

#endif // WATCHDOG_H
#endif // V8FILTERWATCHDOG_H

0 comments on commit 11b538c

Please sign in to comment.