Skip to content

Commit

Permalink
Make heartbeat to be a special empty log (vesoft-inc#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored Jul 11, 2019
1 parent 78dfd25 commit 2746dc2
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 172 deletions.
5 changes: 5 additions & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
while (iter->valid()) {
lastId = iter->logId();
auto log = iter->logMsg();
if (log.empty()) {
VLOG(3) << idStr_ << "Skip the heartbeat!";
++(*iter);
continue;
}
DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
// Skip the timestamp (type of int64_t)
switch (log[sizeof(int64_t)]) {
Expand Down
174 changes: 22 additions & 152 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,12 +1025,10 @@ void RaftPart::processAskForVoteRequest(
void RaftPart::processAppendLogRequest(
const cpp2::AppendLogRequest& req,
cpp2::AppendLogResponse& resp) {
bool isHeartbeat = req.get_log_str_list().empty();
bool hasSnapshot = req.get_snapshot_uri() != nullptr;

VLOG(2) << idStr_
<< "Received a "
<< (isHeartbeat ? "Heartbeat" : "LogAppend")
<< "Received logAppend "
<< ": GraphSpaceId = " << req.get_space()
<< ", partition = " << req.get_part()
<< ", current_term = " << req.get_current_term()
Expand All @@ -1039,12 +1037,10 @@ void RaftPart::processAppendLogRequest(
<< ", leaderPort = " << req.get_leader_port()
<< ", lastLogIdSent = " << req.get_last_log_id_sent()
<< ", lastLogTermSent = " << req.get_last_log_term_sent()
<< (isHeartbeat
? ""
: folly::stringPrintf(
<< folly::stringPrintf(
", num_logs = %ld, logTerm = %ld",
req.get_log_str_list().size(),
req.get_log_term()))
req.get_log_term())
<< (hasSnapshot
? ", SnapshotURI = " + *(req.get_snapshot_uri())
: "");
Expand Down Expand Up @@ -1133,29 +1129,24 @@ void RaftPart::processAppendLogRequest(
resp.set_last_log_term(lastLogTerm_);
}

if (!isHeartbeat) {
// Append new logs
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;
VLOG(2) << idStr_ << "Writing log [" << firstId
<< ", " << firstId + numLogs - 1 << "] to WAL";
LogStrListIterator iter(firstId,
req.get_log_term(),
req.get_log_str_list());
if (wal_->appendLogs(iter)) {
CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
resp.set_last_log_id(lastLogId_);
resp.set_last_log_term(lastLogTerm_);
} else {
LOG(ERROR) << idStr_ << "Failed to append logs to WAL";
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
return;
}
// Append new logs
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;
VLOG(2) << idStr_ << "Writing log [" << firstId
<< ", " << firstId + numLogs - 1 << "] to WAL";
LogStrListIterator iter(firstId,
req.get_log_term(),
req.get_log_str_list());
if (wal_->appendLogs(iter)) {
CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
resp.set_last_log_id(lastLogId_);
resp.set_last_log_term(lastLogTerm_);
} else {
VLOG(2) << idStr_
<< "Request is a heartbeat, nothing to put into WAL";
LOG(ERROR) << idStr_ << "Failed to append logs to WAL";
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
return;
}

if (req.get_committed_log_id() > committedLogId_) {
Expand Down Expand Up @@ -1240,129 +1231,8 @@ cpp2::ErrorCode RaftPart::verifyLeader(


folly::Future<AppendLogResult> RaftPart::sendHeartbeat() {
using namespace folly; // NOLINT since the fancy overload of | operator

VLOG(2) << idStr_ << "Sending heartbeat to all other hosts";

TermID term = 0;
LogID lastLogId = 0;
TermID lastLogTerm = 0;
LogID committed = 0;

decltype(peerHosts_) hosts;
{
std::lock_guard<std::mutex> g(raftLock_);

auto res = canAppendLogs(g);
if (res != AppendLogResult::SUCCEEDED) {
LOG(ERROR) << idStr_
<< "Cannot send heartbeat, clean up the buffer";
return res;
}

if (!logs_.empty()) {
LOG(WARNING) << idStr_
<< "There is logs in the buffer,"
" stop sending the heartbeat";
return AppendLogResult::SUCCEEDED;
}

if (replicatingLogs_) {
VLOG(2) << idStr_
<< "Logs are being sent out."
" Stop sending the heartbeat";
return AppendLogResult::SUCCEEDED;
} else {
// We need to send logs to all followers
replicatingLogs_ = true;
}

// Prepare to send heartbeat to all followers
term = term_;
lastLogId = lastLogId_;
lastLogTerm = lastLogTerm_;
committed = committedLogId_;

hosts = peerHosts_;
}

if (!hosts || hosts->empty()) {
VLOG(2) << idStr_ << "No peer to send the heartbeat";
doneHeartbeat();
return AppendLogResult::SUCCEEDED;
}

auto eb = ioThreadPool_->getEventBase();

using PeerHostEntry = typename decltype(peerHosts_)::element_type::value_type;
return collectNSucceeded(
gen::from(*hosts)
| gen::map([=, self = shared_from_this()] (PeerHostEntry& host) {
VLOG(2) << self->idStr_
<< "Send a heartbeat to "
<< NetworkUtils::intToIPv4(host.first.first)
<< ":" << host.first.second;
return via(
eb,
[=, &host] () -> Future<cpp2::AppendLogResponse> {
return host.second->appendLogs(eb,
term,
lastLogId,
committed,
lastLogTerm,
lastLogId);
});
})
| gen::as<std::vector>(),
// Number of succeeded required
quorum_,
// Result evaluator
[](cpp2::AppendLogResponse& resp) {
return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED;
})
.then([=, self = shared_from_this()] (
folly::Try<AppendLogResponses>&& result)
-> folly::Future<AppendLogResult> {
VLOG(2) << self->idStr_ << "Done with heartbeats";
CHECK(!result.hasException());

self->doneHeartbeat();
return AppendLogResult::SUCCEEDED;
});
}


void RaftPart::doneHeartbeat() {
decltype(logs_) swappedOutLogs;
LogID firstId = 0;
{
std::lock_guard<std::mutex> g(raftLock_);
CHECK(replicatingLogs_);
if (logs_.size() > 0) {
// continue to replicate the logs
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
std::swap(swappedOutLogs, logs_);
firstId = lastLogId_ + 1;
} else {
replicatingLogs_ = false;
}
}
if (!swappedOutLogs.empty()) {
AppendLogsIterator it(
firstId,
std::move(swappedOutLogs),
[this] (const std::string& msg) -> std::string {
auto res = compareAndSet(msg);
if (res.empty()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_CAS_FAILURE);
}
return res;
});
appendLogsInternal(std::move(it));
}
std::string log = "";
return appendLogAsync(clusterId_, LogType::NORMAL, std::move(log));
}

} // namespace raftex
Expand Down
3 changes: 0 additions & 3 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
/*****************************************************************
* Asynchronously send a heartbeat (An empty log entry)
*
* The code path is similar to appendLog() and the heartbeat will
* be put into the log batch, but will not be added to WAL
****************************************************************/
folly::Future<AppendLogResult> sendHeartbeat();
void doneHeartbeat();

/****************************************************
*
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/raftex/test/LogAppendTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(LogAppend, SimpleAppendWithOneCopy) {
ASSERT_EQ(100, c->getNumLogs());
}

LogID id = 1;
LogID id = leader->firstCommittedLogId_;
for (int i = 0; i < 100; ++i, ++id) {
for (auto& c : copies) {
folly::StringPiece msg;
Expand Down Expand Up @@ -103,7 +103,7 @@ TEST(LogAppend, SimpleAppendWithThreeCopies) {
ASSERT_EQ(100, c->getNumLogs());
}

LogID id = 1;
LogID id = leader->firstCommittedLogId_;
for (int i = 0; i < 100; ++i, ++id) {
for (auto& c : copies) {
folly::StringPiece msg;
Expand Down Expand Up @@ -172,7 +172,7 @@ TEST(LogAppend, MultiThreadAppend) {
ASSERT_EQ(numThreads * numLogs, c->getNumLogs());
}

LogID id = 1;
LogID id = leader->firstCommittedLogId_;
for (int i = 0; i < numThreads * numLogs; ++i, ++id) {
folly::StringPiece msg;
ASSERT_TRUE(leader->getLogMsg(id, msg));
Expand Down
14 changes: 7 additions & 7 deletions src/kvstore/raftex/test/LogCASTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_F(LogCASTest, StartWithValidCAS) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -88,7 +88,7 @@ TEST_F(LogCASTest, StartWithInvalidCAS) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -131,7 +131,7 @@ TEST_F(LogCASTest, ValidCASInMiddle) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -173,7 +173,7 @@ TEST_F(LogCASTest, InvalidCASInMiddle) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -209,7 +209,7 @@ TEST_F(LogCASTest, EndWithValidCAS) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -243,7 +243,7 @@ TEST_F(LogCASTest, EndWithInvalidCAS) {
ASSERT_EQ(8, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 8; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -276,7 +276,7 @@ TEST_F(LogCASTest, AllValidCAS) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down
10 changes: 5 additions & 5 deletions src/kvstore/raftex/test/LogCommandTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST_F(LogCommandTest, StartWithCommandLog) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -97,7 +97,7 @@ TEST_F(LogCommandTest, CommandInMiddle) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -131,7 +131,7 @@ TEST_F(LogCommandTest, EndWithCommand) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -164,7 +164,7 @@ TEST_F(LogCommandTest, AllCommandLogs) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down Expand Up @@ -228,7 +228,7 @@ TEST_F(LogCommandTest, MixedLogs) {
ASSERT_EQ(10, c->getNumLogs());
}

LogID id = 1;
LogID id = leader_->firstCommittedLogId_;
for (int i = 0; i < 10; ++i, ++id) {
for (auto& c : copies_) {
folly::StringPiece msg;
Expand Down
13 changes: 11 additions & 2 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,25 @@ bool TestShard::commitLogs(std::unique_ptr<LogIterator> iter) {
VLOG(2) << "TestShard: Committing logs";
LogID firstId = -1;
LogID lastId = -1;
int32_t commitLogsNum = 0;
while (iter->valid()) {
if (firstId < 0) {
firstId = iter->logId();
}
lastId = iter->logId();
data_.emplace(iter->logId(), iter->logMsg().toString());
if (!iter->logMsg().empty()) {
if (firstCommittedLogId_ < 0) {
firstCommittedLogId_ = iter->logId();
}
data_.emplace(iter->logId(), iter->logMsg().toString());
commitLogsNum++;
}
++(*iter);
}
VLOG(2) << "TestShard: Committed log " << firstId << " to " << lastId;
commitTimes_++;
if (commitLogsNum > 0) {
commitTimes_++;
}
return true;
}

Expand Down
Loading

0 comments on commit 2746dc2

Please sign in to comment.