Skip to content

Commit

Permalink
add wait policy on shmem queue writer and reader
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkai036 committed Apr 18, 2018
1 parent 6c5131f commit 05d3d80
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 38 deletions.
3 changes: 2 additions & 1 deletion bin/hog/batchsend.C
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ static void initNetSession(BatchSendSession* s, const std::string& groupName, co
s->stepFile();
}

void pushLocalData(const storage::QueueConnection& qc, const std::string& groupName, const std::string& dir, size_t clevel, size_t batchsendsize, long batchsendtime, const std::vector<std::string>& sendto) {
void pushLocalData(const storage::QueueConnection& qc, const std::string& groupName, const std::string& dir, size_t clevel, size_t batchsendsize, long batchsendtime, const std::vector<std::string>& sendto, const hobbes::storage::WaitPolicy wp) {
auto pt = hobbes::storage::thisProcThread();
batchsendsize = std::max<size_t>(10*1024*1024, batchsendsize);
BatchSendSession sn(groupName, dir + "/tmp_" + str::from(pt.first) + "-" + str::from(pt.second) + "/", clevel, sendto);
Expand All @@ -352,6 +352,7 @@ void pushLocalData(const storage::QueueConnection& qc, const std::string& groupN

storage::runReadProcessWithTimeout(
qc,
wp,
[&](storage::PipeQOS qos, storage::CommitMethod cm, const storage::statements& ss) {
initNetSession(&sn, groupName, dir, qos, cm, ss);
return
Expand Down
2 changes: 1 addition & 1 deletion bin/hog/batchsend.H
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <hobbes/storage.H>

namespace hog {
void pushLocalData(const hobbes::storage::QueueConnection&, const std::string& groupName, const std::string& dir, size_t clevel, size_t batchsendsize, long batchsendtimeInMicros, const std::vector<std::string>& sendto);
void pushLocalData(const hobbes::storage::QueueConnection&, const std::string& groupName, const std::string& dir, size_t clevel, size_t batchsendsize, long batchsendtimeInMicros, const std::vector<std::string>& sendto, const hobbes::storage::WaitPolicy);
}

#endif
Expand Down
3 changes: 2 additions & 1 deletion bin/hog/local.C
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

namespace hog {

void recordLocalData(SessionGroup* sg, const hobbes::storage::QueueConnection& qc, const std::string& dir) {
void recordLocalData(SessionGroup* sg, const hobbes::storage::QueueConnection& qc, const std::string& dir, const hobbes::storage::WaitPolicy wp) {
using namespace hobbes;

storage::runReadProcess(
qc,
wp,
[&](storage::PipeQOS qos, storage::CommitMethod cm, const storage::statements& ss) {
return appendStorageSession(sg, dir, qos, cm, ss);
}
Expand Down
2 changes: 1 addition & 1 deletion bin/hog/local.H
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "session.H"

namespace hog {
void recordLocalData(SessionGroup*, const hobbes::storage::QueueConnection&, const std::string& dir);
void recordLocalData(SessionGroup*, const hobbes::storage::QueueConnection&, const std::string& dir, const hobbes::storage::WaitPolicy);
}

#endif
Expand Down
11 changes: 9 additions & 2 deletions bin/hog/main.C
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct RunMode {
std::set<std::string> groups;
bool consolidate;

hobbes::storage::WaitPolicy wp;

// batchsend
size_t clevel;
size_t batchsendsize;
Expand Down Expand Up @@ -107,6 +109,7 @@ void showUsage() {
" -s port : decides to receive data on the given port\n"
" -c : decides to store equally-typed data across processes in a single file\n"
" -m <dir> : decides where to place the domain socket for producer registration (default: " << hobbes::storage::defaultStoreDir() << ")\n"
" -spin : instruct producer and consumer to spin when either side cannot make progress\n"
<< std::endl;
}

Expand All @@ -116,6 +119,7 @@ RunMode config(int argc, const char** argv) {
r.dir = "./$GROUP/$DATE/data";
r.groupServerDir = hobbes::storage::defaultStoreDir();
r.consolidate = false;
r.wp = hobbes::storage::WaitPolicy::Platform;

if (argc == 1) {
showUsage();
Expand Down Expand Up @@ -184,6 +188,8 @@ RunMode config(int argc, const char** argv) {
} else {
throw std::runtime_error("need domain socket directory for producer registration");
}
} else if (arg == "-spin") {
r.wp = hobbes::storage::WaitPolicy::Spin;
} else {
throw std::runtime_error("invalid argument: " + arg);
}
Expand Down Expand Up @@ -215,10 +221,10 @@ void evalGroupHostConnection(SessionGroup* sg, const std::string& groupName, con
std::string d = instantiateDir(groupName, m.dir);
switch (m.t) {
case RunMode::local:
ts->push_back(std::thread(std::bind(&recordLocalData, sg, qc, d)));
ts->push_back(std::thread(std::bind(&recordLocalData, sg, qc, d, m.wp)));
break;
case RunMode::batchsend:
ts->push_back(std::thread(std::bind(&pushLocalData, qc, groupName, ensureDirExists(d), m.clevel, m.batchsendsize, m.batchsendtime, m.sendto)));
ts->push_back(std::thread(std::bind(&pushLocalData, qc, groupName, ensureDirExists(d), m.clevel, m.batchsendsize, m.batchsendtime, m.sendto, m.wp)));
break;
default:
break;
Expand Down Expand Up @@ -247,6 +253,7 @@ void runGroupHost(const std::string& groupName, const RunMode& m, std::vector<st
out << "disconnected client for '" << groupName << "' due to version mismatch (expected " << HSTORE_VERSION << " but got " << version << ")" << std::endl;
close(c);
} else {
hobbes::fdwrite(c, (uint8_t*)&m.wp, sizeof(m.wp));
hobbes::registerEventHandler(
c,
[sg,groupName,ts,&m](int c) {
Expand Down
Loading

0 comments on commit 05d3d80

Please sign in to comment.