Skip to content

Commit

Permalink
Fix edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio2368 committed Oct 16, 2023
1 parent 9d218a8 commit 17eaa5a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
47 changes: 28 additions & 19 deletions src/Coordination/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
#include <fmt/chrono.h>
#include <libnuraft/req_msg.hxx>

namespace DB
{
Expand Down Expand Up @@ -196,10 +197,13 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
nuraft::raft_server::commit_in_bg();
}

void commitLogs(uint64_t index_to_commit)
void commitLogs(uint64_t index_to_commit, bool initial_commit_exec)
{
leader_commit_index_.store(index_to_commit);
commit(index_to_commit);
quick_commit_index_ = index_to_commit;
lagging_sm_target_index_ = index_to_commit;

commit_in_bg_exec(0, initial_commit_exec);
}

using nuraft::raft_server::raft_server;
Expand Down Expand Up @@ -407,6 +411,8 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
auto log_store = state_manager->load_log_store();
last_log_idx_on_disk = log_store->next_slot() - 1;
LOG_TRACE(log, "Last local log idx {}", last_log_idx_on_disk);
if (state_machine->last_commit_index() >= last_log_idx_on_disk)
keeper_context->local_logs_preprocessed = true;

loadLatestConfig();

Expand Down Expand Up @@ -647,31 +653,34 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
preprocess_logs();
break;
}
case nuraft::cb_func::ProcessReq:
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
auto & req = *static_cast<nuraft::req_msg *>(param->ctx);
if (req.get_type() != nuraft::msg_type::append_entries_request)
break;

if (req.get_commit_idx() == 0)
break;

if (req.log_entries().empty())
break;

auto last_committed_index = state_machine->last_commit_index();
if (!req.log_entries().empty())
// Actual log number.
auto index_to_commit = std::min({last_log_idx_on_disk, req.get_last_log_idx(), req.get_commit_idx()});
LOG_TRACE(log, "Index to commit {}, last committed index {}", index_to_commit, last_committed_index);

if (index_to_commit > last_committed_index)
{
LOG_TRACE(log, "Trying to commit local log entries, committing upto {}", index_to_commit);
raft_instance->commitLogs(index_to_commit, true);
keeper_context->local_logs_preprocessed.wait(false);
}
else if (index_to_commit == last_committed_index && last_log_idx_on_disk > index_to_commit)
{
preprocess_logs();
}
else if (index_to_commit == 0) /// we need to rollback all the logs so we preprocess all of them
{
// Actual log number.
auto index_to_commit = std::min({last_log_idx_on_disk, req.get_last_log_idx(), req.get_commit_idx()});

if (index_to_commit > last_committed_index)
{
LOG_TRACE(log, "Trying to commit local log entries, committing upto {}", index_to_commit);
raft_instance->commitLogs(index_to_commit);
keeper_context->local_logs_preprocessed.wait(false);
}
else if (index_to_commit == 0) /// we need to rollback all the logs so we preprocess all of them
{
preprocess_logs();
}
preprocess_logs();
}
break;
}
Expand Down
2 changes: 2 additions & 0 deletions src/Coordination/tests/gtest_coordination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class CoordinationTest : public ::testing::TestWithParam<CompressionParam>
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");

keeper_context->local_logs_preprocessed = true;
}

void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared<DB::DiskLocal>("LogDisk", path)); }
Expand Down

0 comments on commit 17eaa5a

Please sign in to comment.