Skip to content

Commit

Permalink
support queries of reader/sender state in hog (morganstanley#217)
Browse files Browse the repository at this point in the history
This change resolves morganstanley#209, so that connection/stats details are logged
in the generated structured file (the pathname will be indicated in the
stdout)
  • Loading branch information
chenkai036 authored and kthielen committed Dec 20, 2018
1 parent cdf3f32 commit c75a57f
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 7 deletions.
22 changes: 21 additions & 1 deletion bin/hog/batchsend.C
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "network.H"
#include "session.H"
#include "stat.H"
#include "out.H"

using namespace hobbes;
Expand Down Expand Up @@ -200,16 +201,23 @@ void runSegmentSendingProcess(const std::string groupName, std::vector<Destinati
if (destinations.empty()) {
out() << "no batchsend host specified, compressed segment files will accumulate locally" << std::endl;
} else {
const auto id = hobbes::storage::thisProcThread();
StatFile::instance().log(SenderState{hobbes::now(), id, SenderStatus::Enum::Suspended});

while (!readyFn()) {
out() << "batchsend not ready, waiting on other sender(s)" << std::endl;
sleep(10);
}

StatFile::instance().log(SenderState{hobbes::now(), id, SenderStatus::Enum::Started});
out() << "running segment sending process publishing to " << destinations << std::endl;

while (true) {
try {
connect(destinations);
runConnectedSegmentSendingProcess(groupName, destinations, idleFn);
} catch (const ShutdownException& ex) {
StatFile::instance().log(SenderState{hobbes::now(), id, SenderStatus::Enum::Closed});
out() << ex.what() << std::endl;
return;
} catch (std::exception& ex) {
Expand Down Expand Up @@ -267,7 +275,16 @@ struct BatchSendSession {
}
};

this->sendingThread = std::thread(std::bind(&runSegmentSendingProcess, groupName, std::ref(destinations), readyFn, idleFn));
const auto readerId = hobbes::storage::thisProcThread();
this->sendingThread = std::thread([this, groupName, dir, readerId, readyFn, idleFn]() {
const auto senderId = hobbes::storage::thisProcThread();
std::vector<std::string> senderqueue;
for (auto s : this->detached) {
senderqueue.push_back(s->dir);
}
StatFile::instance().log(SenderRegistration{hobbes::now(), readerId, senderId, this->dir, senderqueue});
runSegmentSendingProcess(groupName, destinations, readyFn, idleFn);
});
}

void allocFile() {
Expand Down Expand Up @@ -412,6 +429,7 @@ void pushLocalData(const storage::QueueConnection& qc, const std::string& groupN
// if we are here, the client is disconnected and the queue is drained
// detach tcp send session synchronously and shut down the reader
SenderGroup::detach(sn);
StatFile::instance().log(ReaderState{hobbes::now(), pt, ReaderStatus::Enum::Closed});
throw ShutdownException("SHM reader shutting down, name: " + qc.shmname);
} else {
batchCheckF();
Expand All @@ -427,6 +445,8 @@ void pushLocalData(const storage::QueueConnection& qc, const std::string& groupN
};
};

StatFile::instance().log(ReaderState{hobbes::now(), pt, ReaderStatus::Enum::Started});

try {
storage::runReadProcessWithTimeout(qc, wp, initFn, batchsendtime, timeoutF);
} catch (const ShutdownException& ex) {
Expand Down
21 changes: 15 additions & 6 deletions bin/hog/main.C
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

#include <hobbes/storage.H>
#include <hobbes/util/str.H>
#include <hobbes/util/codec.H>
#include <hobbes/util/time.H>

#include <thread>
#include <mutex>
#include <atomic>
Expand All @@ -14,6 +14,7 @@
#include "batchsend.H"
#include "batchrecv.H"
#include "session.H"
#include "stat.H"
#include "path.H"
#include "out.H"

Expand Down Expand Up @@ -227,20 +228,27 @@ void evalGroupHostConnection(SessionGroup* sg, const std::string& groupName, con

auto wp = static_cast<hobbes::storage::WaitPolicy>(0x1 & (cmd >> 1));

uint64_t pid=0,tid=0;
uint64_t pid=0, tid=0;
hobbes::fdread(c, reinterpret_cast<char*>(&pid), sizeof(pid));
hobbes::fdread(c, reinterpret_cast<char*>(&tid), sizeof(tid));
out() << "queue registered for group '" << groupName << "' from " << pid << ":" << tid << ", cmd " << static_cast<int>(cmd) << std::endl;

auto qc = hobbes::storage::consumeGroup(groupName, hobbes::storage::ProcThread(pid, tid));

const hobbes::storage::ProcThread writerId {pid, tid};
auto qc = hobbes::storage::consumeGroup(groupName, writerId);

std::string d = instantiateDir(groupName, m.dir);
switch (m.t) {
case RunMode::local:
reg.readers.push_back(std::thread(std::bind(&recordLocalData, sg, qc, d, wp, std::ref(reg.connected))));
reg.readers.emplace_back([=, &reg]() {
StatFile::instance().log(ReaderRegistration{hobbes::now(), writerId, hobbes::storage::thisProcThread(), qc.shmname});
recordLocalData(sg, qc, d, wp, reg.connected);
});
break;
case RunMode::batchsend:
reg.readers.push_back(std::thread(std::bind(&pushLocalData, qc, groupName, ensureDirExists(d), m.clevel, m.batchsendsize, m.batchsendtime, m.sendto, wp, std::ref(reg.connected))));
reg.readers.emplace_back([=, &reg]() {
StatFile::instance().log(ReaderRegistration{hobbes::now(), writerId, hobbes::storage::thisProcThread(), qc.shmname});
pushLocalData(qc, groupName, ensureDirExists(d), m.clevel, m.batchsendsize, m.batchsendtime, m.sendto, wp, reg.connected);
});
break;
default:
break;
Expand Down Expand Up @@ -290,6 +298,7 @@ void run(const RunMode& m) {
if (m.t == RunMode::batchrecv) {
pullRemoteDataT(m.dir, m.localport, m.consolidate, m.storageMode).join();
} else if (m.groups.size() > 0) {
out() << "hog stat file : " << StatFile::instance().filename() << std::endl;
std::map<std::string, std::map<int, RegInfo>> registry;

signal(SIGPIPE, SIG_IGN);
Expand Down
19 changes: 19 additions & 0 deletions bin/hog/stat.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "stat.H"

#include <string>

namespace hog {

inline std::string statFilePrefix() {
return hobbes::storage::defaultStoreDir() + "/hogstat";
}

StatFile& StatFile::instance() {
static StatFile statFile;
return statFile;
}

StatFile::StatFile() : statFile(hobbes::fregion::uniqueFilename(statFilePrefix(), ".db")) {}

}

78 changes: 78 additions & 0 deletions bin/hog/stat.H
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef HOG_STAT_H_INCLUDED
#define HOG_STAT_H_INCLUDED

#include <mutex>

#include <hobbes/hobbes.H>
#include <hobbes/reflect.H>
#include <hobbes/storage.H>
#include <hobbes/fregion.H>

namespace hog {

DEFINE_ENUM(ReaderStatus,
(Started),
(Closed)
);

DEFINE_STRUCT(ReaderState,
(hobbes::datetimeT, datetime),
(hobbes::storage::ProcThread, readerId),
(ReaderStatus, status)
);

DEFINE_STRUCT(ReaderRegistration,
(hobbes::datetimeT, datetime),
(hobbes::storage::ProcThread, writerId),
(hobbes::storage::ProcThread, readerId),
(std::string, shmname)
);

DEFINE_ENUM(SenderStatus,
(Suspended),
(Started),
(Closed)
);

DEFINE_STRUCT(SenderState,
(hobbes::datetimeT, datetime),
(hobbes::storage::ProcThread, id),
(SenderStatus, status)
);

DEFINE_STRUCT(SenderRegistration,
(hobbes::datetimeT, datetime),
(hobbes::storage::ProcThread, readerId),
(hobbes::storage::ProcThread, senderId),
(std::string, directory),
(std::vector<std::string>, senderqueue)
);

class StatFile {
public:
static StatFile& instance();

template <typename T>
void log(T&& value) {
std::lock_guard<decltype(mutex)> _{mutex};
statFile.series<T>(T::_hmeta_struct_type_name())(std::forward<T>(value));
}

const std::string& filename() const {
return statFile.fileData()->path;
}

private:
StatFile();
~StatFile() = default;
StatFile(const StatFile&) = delete;
StatFile& operator=(const StatFile&) = delete;

hobbes::fregion::writer statFile;
std::mutex mutex;
};

}

#endif

2 changes: 2 additions & 0 deletions include/hobbes/fregion.H
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,7 @@ public:
}

imagefile* fileData() { return this->f; }
const imagefile* fileData() const { return this->f; }
private:
typedef std::map<std::string, seriesi*> wseriess;
imagefile* f;
Expand Down Expand Up @@ -2315,6 +2316,7 @@ public:
}

imagefile* fileData() { return this->f; }
const imagefile* fileData() const { return this->f; }
private:
typedef std::map<std::string, seriesi*> rseriess;
imagefile* f;
Expand Down

0 comments on commit c75a57f

Please sign in to comment.