Skip to content

Commit

Permalink
fix update consecutively would cause storage abnormal, fix bug in upd…
Browse files Browse the repository at this point in the history
…ate (vesoft-inc#1614)

Co-authored-by: dangleptr <[email protected]>
  • Loading branch information
critical27 and dangleptr committed Jan 10, 2020
1 parent 9222944 commit ddb13a6
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ void RaftPart::processAppendLogResponses(
}
if (!iter.empty()) {
this->appendLogsInternal(std::move(iter), currTerm);
} else {
replicatingLogs_ = false;
}
} else {
// Not enough hosts accepted the log, re-try
Expand Down
62 changes: 62 additions & 0 deletions src/kvstore/raftex/test/LogCASTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,68 @@ TEST_F(LogCASTest, AllInvalidCAS) {
}
}

TEST_F(LogCASTest, OnlyOneCasSucceed) {
// Append logs
LOG(INFO) << "=====> Start appending logs";
std::vector<std::string> msgs;
for (int i = 1; i <= 10; ++i) {
std::string log;
if (i == 1) {
log = "TCAS Log " + std::to_string(i);
msgs.emplace_back(log.substr(1));
} else {
log = "FCAS Log " + std::to_string(i);
}
auto fut = leader_->atomicOpAsync([log = std::move(log)] () mutable {
return test::compareAndSet(log);
});
if (i == 10) {
fut.wait();
}
}
LOG(INFO) << "<===== Finish appending logs";

// Sleep a while to make sure the last log has been committed on followers
sleep(FLAGS_raft_heartbeat_interval_secs);

// Check every copy
for (auto& c : copies_) {
ASSERT_EQ(1, c->getNumLogs());
}
checkConsensus(copies_, 0, 0, msgs);
}

TEST_F(LogCASTest, ZipCasTest) {
// Append logs
LOG(INFO) << "=====> Start appending logs";
std::vector<std::string> msgs;
for (int i = 1; i <= 10; ++i) {
std::string log;
if (i % 2) {
log = "TCAS Log " + std::to_string(i);
msgs.emplace_back(log.substr(1));
} else {
log = "FCAS Log " + std::to_string(i);
}
auto fut = leader_->atomicOpAsync([log = std::move(log)] () mutable {
return test::compareAndSet(log);
});
if (i == 10) {
fut.wait();
}
}
LOG(INFO) << "<===== Finish appending logs";

// Sleep a while to make sure the last log has been committed on followers
sleep(FLAGS_raft_heartbeat_interval_secs);

// Check every copy
for (auto& c : copies_) {
ASSERT_EQ(5, c->getNumLogs());
}
checkConsensus(copies_, 0, 4, msgs);
}

} // namespace raftex
} // namespace nebula

Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
<< ", dst: " << edgeKey.get_dst() << ", ranking: " << edgeKey.get_ranking();
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[&, this] () -> std::string {
[partId, edgeKey, this] () -> std::string {
if (checkFilter(partId, edgeKey)) {
return updateAndWriteBack(partId, edgeKey);
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
<< ", partId: " << partId << ", vId: " << vId;
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[&, this] () -> std::string {
[partId, vId, this] () -> std::string {
if (checkFilter(partId, vId)) {
return updateAndWriteBack(partId, vId);
}
Expand Down

0 comments on commit ddb13a6

Please sign in to comment.