diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 54c8d2bbf53..12436ebddee 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -155,6 +155,11 @@ bool Part::commitLogs(std::unique_ptr iter) { while (iter->valid()) { lastId = iter->logId(); auto log = iter->logMsg(); + if (log.empty()) { + VLOG(3) << idStr_ << "Skip the heartbeat!"; + ++(*iter); + continue; + } DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); // Skip the timestamp (type of int64_t) switch (log[sizeof(int64_t)]) { diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 07c258026e9..e43038ec7d3 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1025,12 +1025,10 @@ void RaftPart::processAskForVoteRequest( void RaftPart::processAppendLogRequest( const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp) { - bool isHeartbeat = req.get_log_str_list().empty(); bool hasSnapshot = req.get_snapshot_uri() != nullptr; VLOG(2) << idStr_ - << "Received a " - << (isHeartbeat ? "Heartbeat" : "LogAppend") + << "Received logAppend " << ": GraphSpaceId = " << req.get_space() << ", partition = " << req.get_part() << ", current_term = " << req.get_current_term() @@ -1039,12 +1037,10 @@ void RaftPart::processAppendLogRequest( << ", leaderPort = " << req.get_leader_port() << ", lastLogIdSent = " << req.get_last_log_id_sent() << ", lastLogTermSent = " << req.get_last_log_term_sent() - << (isHeartbeat - ? "" - : folly::stringPrintf( + << folly::stringPrintf( ", num_logs = %ld, logTerm = %ld", req.get_log_str_list().size(), - req.get_log_term())) + req.get_log_term()) << (hasSnapshot ? ", SnapshotURI = " + *(req.get_snapshot_uri()) : ""); @@ -1133,29 +1129,24 @@ void RaftPart::processAppendLogRequest( resp.set_last_log_term(lastLogTerm_); } - if (!isHeartbeat) { - // Append new logs - size_t numLogs = req.get_log_str_list().size(); - LogID firstId = req.get_last_log_id_sent() + 1; - VLOG(2) << idStr_ << "Writing log [" << firstId - << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, - req.get_log_term(), - req.get_log_str_list()); - if (wal_->appendLogs(iter)) { - CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()); - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - } else { - LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); - return; - } + // Append new logs + size_t numLogs = req.get_log_str_list().size(); + LogID firstId = req.get_last_log_id_sent() + 1; + VLOG(2) << idStr_ << "Writing log [" << firstId + << ", " << firstId + numLogs - 1 << "] to WAL"; + LogStrListIterator iter(firstId, + req.get_log_term(), + req.get_log_str_list()); + if (wal_->appendLogs(iter)) { + CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()); + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + resp.set_last_log_id(lastLogId_); + resp.set_last_log_term(lastLogTerm_); } else { - VLOG(2) << idStr_ - << "Request is a heartbeat, nothing to put into WAL"; + LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; } if (req.get_committed_log_id() > committedLogId_) { @@ -1240,129 +1231,8 @@ cpp2::ErrorCode RaftPart::verifyLeader( folly::Future RaftPart::sendHeartbeat() { - using namespace folly; // NOLINT since the fancy overload of | operator - - VLOG(2) << idStr_ << "Sending heartbeat to all other hosts"; - - TermID term = 0; - LogID lastLogId = 0; - TermID lastLogTerm = 0; - LogID committed = 0; - - decltype(peerHosts_) hosts; - { - std::lock_guard g(raftLock_); - - auto res = canAppendLogs(g); - if (res != AppendLogResult::SUCCEEDED) { - LOG(ERROR) << idStr_ - << "Cannot send heartbeat, clean up the buffer"; - return res; - } - - if (!logs_.empty()) { - LOG(WARNING) << idStr_ - << "There is logs in the buffer," - " stop sending the heartbeat"; - return AppendLogResult::SUCCEEDED; - } - - if (replicatingLogs_) { - VLOG(2) << idStr_ - << "Logs are being sent out." - " Stop sending the heartbeat"; - return AppendLogResult::SUCCEEDED; - } else { - // We need to send logs to all followers - replicatingLogs_ = true; - } - - // Prepare to send heartbeat to all followers - term = term_; - lastLogId = lastLogId_; - lastLogTerm = lastLogTerm_; - committed = committedLogId_; - - hosts = peerHosts_; - } - - if (!hosts || hosts->empty()) { - VLOG(2) << idStr_ << "No peer to send the heartbeat"; - doneHeartbeat(); - return AppendLogResult::SUCCEEDED; - } - - auto eb = ioThreadPool_->getEventBase(); - - using PeerHostEntry = typename decltype(peerHosts_)::element_type::value_type; - return collectNSucceeded( - gen::from(*hosts) - | gen::map([=, self = shared_from_this()] (PeerHostEntry& host) { - VLOG(2) << self->idStr_ - << "Send a heartbeat to " - << NetworkUtils::intToIPv4(host.first.first) - << ":" << host.first.second; - return via( - eb, - [=, &host] () -> Future { - return host.second->appendLogs(eb, - term, - lastLogId, - committed, - lastLogTerm, - lastLogId); - }); - }) - | gen::as(), - // Number of succeeded required - quorum_, - // Result evaluator - [](cpp2::AppendLogResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED; - }) - .then([=, self = shared_from_this()] ( - folly::Try&& result) - -> folly::Future { - VLOG(2) << self->idStr_ << "Done with heartbeats"; - CHECK(!result.hasException()); - - self->doneHeartbeat(); - return AppendLogResult::SUCCEEDED; - }); -} - - -void RaftPart::doneHeartbeat() { - decltype(logs_) swappedOutLogs; - LogID firstId = 0; - { - std::lock_guard g(raftLock_); - CHECK(replicatingLogs_); - if (logs_.size() > 0) { - // continue to replicate the logs - sendingPromise_ = std::move(cachingPromise_); - cachingPromise_.reset(); - std::swap(swappedOutLogs, logs_); - firstId = lastLogId_ + 1; - } else { - replicatingLogs_ = false; - } - } - if (!swappedOutLogs.empty()) { - AppendLogsIterator it( - firstId, - std::move(swappedOutLogs), - [this] (const std::string& msg) -> std::string { - auto res = compareAndSet(msg); - if (res.empty()) { - // Failed - sendingPromise_.setOneSingleValue( - AppendLogResult::E_CAS_FAILURE); - } - return res; - }); - appendLogsInternal(std::move(it)); - } + std::string log = ""; + return appendLogAsync(clusterId_, LogType::NORMAL, std::move(log)); } } // namespace raftex diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 2c333566252..312c472c57f 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -248,11 +248,8 @@ class RaftPart : public std::enable_shared_from_this { /***************************************************************** * Asynchronously send a heartbeat (An empty log entry) * - * The code path is similar to appendLog() and the heartbeat will - * be put into the log batch, but will not be added to WAL ****************************************************************/ folly::Future sendHeartbeat(); - void doneHeartbeat(); /**************************************************** * diff --git a/src/kvstore/raftex/test/LogAppendTest.cpp b/src/kvstore/raftex/test/LogAppendTest.cpp index f841b0fd077..f8a99789f90 100644 --- a/src/kvstore/raftex/test/LogAppendTest.cpp +++ b/src/kvstore/raftex/test/LogAppendTest.cpp @@ -56,7 +56,7 @@ TEST(LogAppend, SimpleAppendWithOneCopy) { ASSERT_EQ(100, c->getNumLogs()); } - LogID id = 1; + LogID id = leader->firstCommittedLogId_; for (int i = 0; i < 100; ++i, ++id) { for (auto& c : copies) { folly::StringPiece msg; @@ -103,7 +103,7 @@ TEST(LogAppend, SimpleAppendWithThreeCopies) { ASSERT_EQ(100, c->getNumLogs()); } - LogID id = 1; + LogID id = leader->firstCommittedLogId_; for (int i = 0; i < 100; ++i, ++id) { for (auto& c : copies) { folly::StringPiece msg; @@ -172,7 +172,7 @@ TEST(LogAppend, MultiThreadAppend) { ASSERT_EQ(numThreads * numLogs, c->getNumLogs()); } - LogID id = 1; + LogID id = leader->firstCommittedLogId_; for (int i = 0; i < numThreads * numLogs; ++i, ++id) { folly::StringPiece msg; ASSERT_TRUE(leader->getLogMsg(id, msg)); diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index 01ac17a1743..58dad056b3d 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -53,7 +53,7 @@ TEST_F(LogCASTest, StartWithValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -88,7 +88,7 @@ TEST_F(LogCASTest, StartWithInvalidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -131,7 +131,7 @@ TEST_F(LogCASTest, ValidCASInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -173,7 +173,7 @@ TEST_F(LogCASTest, InvalidCASInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -209,7 +209,7 @@ TEST_F(LogCASTest, EndWithValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -243,7 +243,7 @@ TEST_F(LogCASTest, EndWithInvalidCAS) { ASSERT_EQ(8, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 8; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -276,7 +276,7 @@ TEST_F(LogCASTest, AllValidCAS) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index f3c5cbb67b0..b1651f0b36b 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -54,7 +54,7 @@ TEST_F(LogCommandTest, StartWithCommandLog) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -97,7 +97,7 @@ TEST_F(LogCommandTest, CommandInMiddle) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -131,7 +131,7 @@ TEST_F(LogCommandTest, EndWithCommand) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -164,7 +164,7 @@ TEST_F(LogCommandTest, AllCommandLogs) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; @@ -228,7 +228,7 @@ TEST_F(LogCommandTest, MixedLogs) { ASSERT_EQ(10, c->getNumLogs()); } - LogID id = 1; + LogID id = leader_->firstCommittedLogId_; for (int i = 0; i < 10; ++i, ++id) { for (auto& c : copies_) { folly::StringPiece msg; diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 21f6d023376..14f3fdc8d01 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -69,16 +69,25 @@ bool TestShard::commitLogs(std::unique_ptr iter) { VLOG(2) << "TestShard: Committing logs"; LogID firstId = -1; LogID lastId = -1; + int32_t commitLogsNum = 0; while (iter->valid()) { if (firstId < 0) { firstId = iter->logId(); } lastId = iter->logId(); - data_.emplace(iter->logId(), iter->logMsg().toString()); + if (!iter->logMsg().empty()) { + if (firstCommittedLogId_ < 0) { + firstCommittedLogId_ = iter->logId(); + } + data_.emplace(iter->logId(), iter->logMsg().toString()); + commitLogsNum++; + } ++(*iter); } VLOG(2) << "TestShard: Committed log " << firstId << " to " << lastId; - commitTimes_++; + if (commitLogsNum > 0) { + commitTimes_++; + } return true; } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 3b5a8097b38..c4d0c46b4a8 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -56,6 +56,7 @@ class TestShard : public RaftPart { public: int32_t commitTimes_ = 0; + int32_t firstCommittedLogId_ = -1; private: const size_t idx_;