Skip to content

Commit

Permalink
Link up Election Context from TransferLeader to EDCB
Browse files Browse the repository at this point in the history
Summary:

Previously we added more metdata for EDCB, but it was limited to a
single election. This change pipes ElectionContext received from
TransferLeader through kudu rpcs so that it eventually arrives at the
new leader's EDCB.

In MySQL Raft, when EDCB is called and it determines that we need
another elections, the ElectionContext can be passed back to preserve
context.

Test Plan:

Tested with fbcode.

Reviewers: anirbanr-fb, bhatvinay, yashtc

Subscribers:

Tasks:

Tags:
Signed-off-by: Yichen <[email protected]>
  • Loading branch information
yichenshen committed Feb 4, 2021
1 parent 5285a84 commit 69e6b3e
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 59 deletions.
24 changes: 18 additions & 6 deletions src/kudu/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,21 @@ message GetNodeInstanceResponsePB {
required NodeInstancePB node_instance = 1;
}

message LeaderElectionContextPB {
// Time when the original server was promoted away from. We can use this to
// measure the total time of a chain of promotions
// Should be specified as nanoseconds since epoch
required fixed64 original_start_time = 1;

// UUID of original server that was promoted away from, used when current
// server cannot be leader
required bytes original_uuid = 2;

// True if the original promotion was due to the original leader specified in
// original_uuid being dead/unreachable
required bool is_origin_dead_promotion = 3 [default = false];
}

// Message that makes the local peer run leader election to be elected leader.
// Assumes that a tablet with 'tablet_id' exists.
message RunLeaderElectionRequestPB {
Expand All @@ -543,13 +558,10 @@ message RunLeaderElectionRequestPB {
// the id of the tablet
required bytes tablet_id = 1;

// Time when the original server was promoted away from. We can use this to
// measure the total time of a chain of promotions
optional fixed64 original_start_time = 3;
// optional fixed64 original_start_time = 3;
// optional bytes original_uuid = 4;

// UUID of original server that was promoted away from, used when current
// server cannot be leader
optional bytes original_uuid = 4;
optional LeaderElectionContextPB election_context = 5;
}

message RunLeaderElectionResponsePB {
Expand Down
3 changes: 1 addition & 2 deletions src/kudu/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,7 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
});
}

Status Peer::StartElection() {
RunLeaderElectionRequestPB req;
Status Peer::StartElection(RunLeaderElectionRequestPB req) {
RunLeaderElectionResponsePB resp;
RpcController controller;
req.set_dest_uuid(peer_pb().permanent_uuid());
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
// StartElection request.
// The StartElection RPC does not count as the single outstanding request
// that this class tracks.
Status StartElection();
Status StartElection(RunLeaderElectionRequestPB req = {});

const RaftPeerPB& peer_pb() const { return peer_pb_; }

Expand Down
15 changes: 11 additions & 4 deletions src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ int64_t PeerMessageQueue::ComputeNewWatermarkDynamicMode(int64_t* watermark) {
int64_t PeerMessageQueue::ComputeNewWatermarkStaticMode(int64_t* watermark) {
CHECK(watermark);
CHECK(queue_state_.active_config->has_commit_rule());

if (!IsStaticQuorumMode(queue_state_.active_config->commit_rule().mode())) {
return *watermark;
}
Expand Down Expand Up @@ -1295,9 +1295,12 @@ void PeerMessageQueue::AdvanceMajorityReplicatedWatermarkFlexiRaft(

void PeerMessageQueue::BeginWatchForSuccessor(
const boost::optional<string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn) {
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
boost::optional<PeerMessageQueue::TransferContext> transfer_context) {
std::lock_guard<simple_spinlock> l(queue_lock_);

transfer_context_ = std::move(transfer_context);

if (successor_uuid && FLAGS_synchronous_transfer_leadership &&
PeerTransferLeadershipImmediatelyUnlocked(successor_uuid.get())) {
return;
Expand All @@ -1311,6 +1314,7 @@ void PeerMessageQueue::BeginWatchForSuccessor(
void PeerMessageQueue::EndWatchForSuccessor() {
std::lock_guard<simple_spinlock> l(queue_lock_);
successor_watch_in_progress_ = false;
transfer_context_ = boost::none;
tl_filter_fn_ = nullptr;
}

Expand Down Expand Up @@ -1968,10 +1972,13 @@ void PeerMessageQueue::NotifyObserversOfSuccessor(const string& peer_uuid) {
DCHECK(queue_lock_.is_locked());
WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
[=](PeerMessageQueueObserver* observer) {
observer->NotifyPeerToStartElection(peer_uuid);
[=, transfer_context = std::move(transfer_context_)] (
PeerMessageQueueObserver* observer
) {
observer->NotifyPeerToStartElection(peer_uuid, std::move(transfer_context));
})),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of available successor.");
transfer_context_ = boost::none;
}

void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
Expand Down
18 changes: 15 additions & 3 deletions src/kudu/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ class PeerMessageQueue {
int64_t last_seen_term_;
};

struct TransferContext {
std::chrono::system_clock::time_point original_start_time;
std::string original_uuid;
bool is_origin_dead_promotion;
};

PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<log::Log> log,
scoped_refptr<TimeManager> time_manager,
Expand Down Expand Up @@ -379,8 +385,11 @@ class PeerMessageQueue {
// boost::none, the queue will notify its observers when 'successor_uuid' is
// caught up to the leader. Otherwise, it will notify its observers
// with the UUID of the first voter that is caught up.
void BeginWatchForSuccessor(const boost::optional<std::string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn);
void BeginWatchForSuccessor(
const boost::optional<std::string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
boost::optional<TransferContext> transfer_context = boost::none
);
void EndWatchForSuccessor();

// Get the UUID of the next routing hop from the local node.
Expand Down Expand Up @@ -647,6 +656,7 @@ class PeerMessageQueue {

bool successor_watch_in_progress_;
boost::optional<std::string> designated_successor_uuid_;
boost::optional<TransferContext> transfer_context_;

std::function<bool(const kudu::consensus::RaftPeerPB&)> tl_filter_fn_;
// We assume that we never have multiple threads racing to append to the queue.
Expand Down Expand Up @@ -683,7 +693,9 @@ class PeerMessageQueueObserver {

// Notify the observer that the specified peer is ready to become leader, and
// and it should be told to run an election.
virtual void NotifyPeerToStartElection(const std::string& peer_uuid) = 0;
virtual void NotifyPeerToStartElection(
const std::string& peer_uuid,
boost::optional<PeerMessageQueue::TransferContext> transfer_context) = 0;

// Notify the observer that the health of one of the peers has changed.
virtual void NotifyPeerHealthChange() = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/consensus/peer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ void PeerManager::SignalRequest(bool force_if_queue_empty) {
}
}

Status PeerManager::StartElection(const std::string& uuid) {
Status PeerManager::StartElection(
const std::string& uuid, RunLeaderElectionRequestPB req) {
std::shared_ptr<Peer> peer;
{
std::lock_guard<simple_spinlock> lock(lock_);
Expand All @@ -117,7 +118,7 @@ Status PeerManager::StartElection(const std::string& uuid) {
if (!peer) {
return Status::NotFound("unknown peer");
}
return peer->StartElection();
return peer->StartElection(std::move(req));
}

void PeerManager::Close() {
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/consensus/peer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class PeerManager {
void SignalRequest(bool force_if_queue_empty = false);

// Start an election on the peer with UUID 'uuid'.
Status StartElection(const std::string& uuid);
Status StartElection(const std::string& uuid,
RunLeaderElectionRequestPB req = {});

// Closes all peers.
void Close();
Expand Down
58 changes: 47 additions & 11 deletions src/kudu/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ using strings::Substitute;
namespace kudu {
namespace consensus {

PeerMessageQueue::TransferContext ElectionContext::TransferContext() const {
if (is_chained_election_) {
return { chained_start_time_, source_uuid_, is_origin_dead_promotion_ };
}
return { start_time_, current_leader_uuid_, is_origin_dead_promotion_ };
}

RaftConsensus::RaftConsensus(
ConsensusOptions options,
RaftPeerPB local_peer_pb,
Expand Down Expand Up @@ -543,9 +550,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionContext context)
LockGuard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());

context.current_leader_uuid = GetLeaderUuidUnlocked();
if (context.source_uuid.empty()) {
context.source_uuid = context.current_leader_uuid;
context.current_leader_uuid_ = GetLeaderUuidUnlocked();
if (context.source_uuid_.empty()) {
context.source_uuid_ = context.current_leader_uuid_;
}

RaftPeerPB::Role active_role = cmeta_->active_role();
Expand Down Expand Up @@ -575,7 +582,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionContext context)

LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Starting " << mode_str
<< " (" << ReasonString(context.reason, GetLeaderUuidUnlocked()) << ")";
<< " (" << ReasonString(context.reason_, GetLeaderUuidUnlocked()) << ")";

// Snooze to avoid the election timer firing again as much as possible.
// We do not disable the election timer while running an election, so that
Expand Down Expand Up @@ -713,6 +720,7 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {

Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_leader_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
const boost::optional<ElectionContext>& prev_election_ctx,
LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership");
ThreadRestrictions::AssertWaitAllowed();
Expand Down Expand Up @@ -746,20 +754,32 @@ Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_lead
return Status::InvalidArgument(msg);
}
}
return BeginLeaderTransferPeriodUnlocked(new_leader_uuid, filter_fn);
return BeginLeaderTransferPeriodUnlocked(new_leader_uuid, filter_fn, prev_election_ctx);
}

Status RaftConsensus::BeginLeaderTransferPeriodUnlocked(
const boost::optional<string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn) {
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
const boost::optional<ElectionContext>& prev_election_ctx
) {
DCHECK(lock_.is_locked());
if (leader_transfer_in_progress_.CompareAndSwap(false, true)) {
return Status::ServiceUnavailable(
Substitute("leadership transfer for $0 already in progress",
options_.tablet_id));
}
leader_transfer_in_progress_.Store(true, kMemOrderAcquire);
queue_->BeginWatchForSuccessor(successor_uuid, filter_fn);

boost::optional<PeerMessageQueue::TransferContext> transfer_context =
boost::none;

queue_->BeginWatchForSuccessor(
successor_uuid,
filter_fn,
prev_election_ctx ?
boost::make_optional(prev_election_ctx->TransferContext()) :
boost::none
);
transfer_period_timer_->Start();
return Status::OK();
}
Expand Down Expand Up @@ -1074,11 +1094,13 @@ void RaftConsensus::NotifyPeerToPromote(const std::string& peer_uuid) {
LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
}

void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid,
boost::optional<PeerMessageQueue::TransferContext> transfer_context) {
LOG(INFO) << "Instructing follower " << peer_uuid << " to start an election";
WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
shared_from_this(),
peer_uuid)),
peer_uuid,
std::move(transfer_context))),
LogPrefixThreadSafe() + "Unable to start TryStartElectionOnPeerTask");
}

Expand Down Expand Up @@ -1153,7 +1175,8 @@ void RaftConsensus::TryPromoteNonVoterTask(const std::string& peer_uuid) {
LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid));
}

void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid,
const boost::optional<PeerMessageQueue::TransferContext>& transfer_context) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
// Double-check that the peer is a voter in the active config.
Expand All @@ -1165,7 +1188,20 @@ void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
<< "to start an election";
WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),

RunLeaderElectionRequestPB req;
if (transfer_context) {
LeaderElectionContextPB* ctx = req.mutable_election_context();
ctx->set_original_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
transfer_context->original_start_time.time_since_epoch())
.count());
ctx->set_original_uuid(transfer_context->original_uuid);
ctx->set_is_origin_dead_promotion(
transfer_context->is_origin_dead_promotion);
}

WARN_NOT_OK(peer_manager_->StartElection(peer_uuid, std::move(req)),
Substitute("unable to start election on peer $0", peer_uuid));
}

Expand Down
Loading

0 comments on commit 69e6b3e

Please sign in to comment.