Skip to content

Commit

Permalink
Adding ability to enable/disable voter distribution adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav04sharma committed Jun 30, 2021
1 parent 6ab77c9 commit 7ce9b94
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 23 deletions.
24 changes: 18 additions & 6 deletions src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
local_peer_pb_(std::move(local_peer_pb)),
routing_table_container_(std::move(routing_table_container)),
tablet_id_(std::move(tablet_id)),
adjust_voter_distribution_(true),
successor_watch_in_progress_(false),
log_cache_(metric_entity, std::move(log), local_peer_pb_.permanent_uuid(), tablet_id_),
metrics_(metric_entity),
Expand Down Expand Up @@ -1257,6 +1258,13 @@ int64_t PeerMessageQueue::ComputeNewWatermarkDynamicMode(int64_t* watermark) {

int total_voters = std::max(
total_voters_from_voter_distribution, total_voters_from_active_config);

// adjust_voter_distribution_ is set to false on in cases where we want to
// perform an election forcefully i.e. unsafe config change
if (PREDICT_FALSE(!adjust_voter_distribution_)) {
total_voters = total_voters_from_voter_distribution;
}

int commit_req = MajoritySize(total_voters);

VLOG_WITH_PREFIX_UNLOCKED(1) << "Computing new commit index in single "
Expand Down Expand Up @@ -1332,12 +1340,16 @@ int64_t PeerMessageQueue::ComputeNewWatermarkStaticMode(int64_t* watermark) {
queue_state_.active_config->voter_distribution().begin(),
queue_state_.active_config->voter_distribution().end());

// Compute number of voters in each region in the active config.
// As voter distribution provided in topology config can lag,
// we need to take into account the active voters as well due to
// membership changes.
AdjustVoterDistributionWithCurrentVoters(
*(queue_state_.active_config), &voter_distribution);
// adjust_voter_distribution_ is set to false on in cases where we want to
// perform an election forcefully i.e. unsafe config change
if (PREDICT_TRUE(adjust_voter_distribution_)) {
// Compute number of voters in each region in the active config.
// As voter distribution provided in topology config can lag,
// we need to take into account the active voters as well due to
// membership changes.
AdjustVoterDistributionWithCurrentVoters(
*(queue_state_.active_config), &voter_distribution);
}

return DoComputeNewWatermarkStaticMode(
voter_distribution, watermarks_by_region, watermark);
Expand Down
8 changes: 8 additions & 0 deletions src/kudu/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ class PeerMessageQueue {
bool HasProxyPeerFailedUnlocked(
const TrackedPeer* proxy_peer, const TrackedPeer* dest_peer);

void SetAdjustVoterDistribution(bool val) {
std::lock_guard<simple_spinlock> lock(queue_lock_);
adjust_voter_distribution_ = val;
}

private:
FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward);
Expand Down Expand Up @@ -676,6 +681,9 @@ class PeerMessageQueue {

QueueState queue_state_;

// Should we adjust voter distribution based on current config?
bool adjust_voter_distribution_;

// The currently tracked peers.
PeersMap peers_map_;
mutable simple_spinlock queue_lock_; // TODO(todd): rename
Expand Down
32 changes: 19 additions & 13 deletions src/kudu/consensus/leader_election.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,27 +209,33 @@ void FlexibleVoteCounter::FetchTopologyInfo() {
}
}

// Step 1: Compute number of voters in each region in the active config.
// As voter distribution provided in topology config can lag,
// we need to take into account the active voters as well due to
// membership changes.
AdjustVoterDistributionWithCurrentVoters(config_, &voter_distribution_);

// We assume that there are no voters in config_.peers() who
// are in present in regions not covered by voter_distribution_
// That is enforced via bootstrap and add-member
// The reverse is not true and has been handled in
// AdjustVoterDistributionWithCurrentVoters
// adjust_voter_distribution_ is set to false on in cases where we want to
// perform an election forcefully i.e. unsafe config change
if (PREDICT_TRUE(adjust_voter_distribution_)) {
// Step 1: Compute number of voters in each region in the active config.
// As voter distribution provided in topology config can lag,
// we need to take into account the active voters as well due to
// membership changes.
AdjustVoterDistributionWithCurrentVoters(config_, &voter_distribution_);

// We assume that there are no voters in config_.peers() who
// are in present in regions not covered by voter_distribution_
// That is enforced via bootstrap and add-member
// The reverse is not true and has been handled in
// AdjustVoterDistributionWithCurrentVoters
}
}

FlexibleVoteCounter::FlexibleVoteCounter(
const std::string& candidate_uuid,
int64_t election_term,
const LastKnownLeaderPB& last_known_leader,
RaftConfigPB config)
RaftConfigPB config,
bool adjust_voter_distribution)
: VoteCounter(1, 1),
candidate_uuid_(candidate_uuid),
election_term_(election_term),
adjust_voter_distribution_(adjust_voter_distribution),
last_known_leader_(last_known_leader),
config_(std::move(config)) {
num_voters_ = 0;
Expand All @@ -242,7 +248,7 @@ FlexibleVoteCounter::FlexibleVoteCounter(
// When instances are being removed from ring, the voter distribution
// can have extra regions, but we have taken them out in
// FetchTopology. So this should never happen
if (regional_voter_count.second <= 0) {
if (adjust_voter_distribution_ && regional_voter_count.second <= 0) {
continue;
}
// num_voters_ += regional_voter_count.second;
Expand Down
9 changes: 7 additions & 2 deletions src/kudu/consensus/leader_election.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ class FlexibleVoteCounter : public VoteCounter {
public:
FlexibleVoteCounter(
const std::string& candidate_uuid,
int64_t election_term, const LastKnownLeaderPB& last_known_leader,
RaftConfigPB config);
int64_t election_term,
const LastKnownLeaderPB& last_known_leader,
RaftConfigPB config,
bool adjust_voter_distribution);

// Synchronization is done by the LeaderElection class. Therefore, VoteCounter
// class doesn't need to take care of thread safety of its book-keeping
Expand Down Expand Up @@ -335,6 +337,9 @@ class FlexibleVoteCounter : public VoteCounter {
// Mapping from each region to number of active voters.
std::map<std::string, int> voter_distribution_;

// Should we adjust voter distribution based on current config?
const bool adjust_voter_distribution_;

// Vote count per region.
std::map<std::string, int> yes_vote_count_, no_vote_count_;

Expand Down
14 changes: 12 additions & 2 deletions src/kudu/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ RaftConsensus::RaftConsensus(
leader_transfer_in_progress_(false),
withhold_votes_until_(MonoTime::Min()),
reject_append_entries_(false),
adjust_voter_distribution_(true),
withhold_votes_(false),
last_received_cur_leader_(MinimumOpId()),
failed_elections_since_stable_leader_(0),
Expand Down Expand Up @@ -668,8 +669,11 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionContext context)
counter.reset(new VoteCounter(num_voters, majority_size));
} else {
counter.reset(new FlexibleVoteCounter(
peer_uuid(), candidate_term,
cmeta_->last_known_leader(), active_config));
peer_uuid(),
candidate_term,
cmeta_->last_known_leader(),
active_config,
adjust_voter_distribution_));

// Populate vote history for self. Although not really needed, this makes
// the code simpler.
Expand Down Expand Up @@ -3488,6 +3492,12 @@ void RaftConsensus::SetRejectAppendEntriesForTests(bool reject_append_entries) {
reject_append_entries_ = reject_append_entries;
}

void RaftConsensus::SetAdjustVoterDistribution(bool val) {
LockGuard l(lock_);
queue_->SetAdjustVoterDistribution(val);
adjust_voter_distribution_ = val;
}

void RaftConsensus::UpdateFailureDetectorState(boost::optional<MonoDelta> delta) {
DCHECK(lock_.is_locked());
const auto& uuid = peer_uuid();
Expand Down
6 changes: 6 additions & 0 deletions src/kudu/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Rejects AppendEntries RPCs, if set to true.
void SetRejectAppendEntriesForTests(bool reject_append_entries);

// If set to false we won't adjust voter distribution based on current config
void SetAdjustVoterDistribution(bool val);

// Update the proxy policy used to route entries
Status SetProxyPolicy(const ProxyPolicy& proxy_policy);

Expand Down Expand Up @@ -1167,6 +1170,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// This is used in tests to reject AppendEntries RPC requests.
bool reject_append_entries_;

// Should we adjust voter distribution based on current config?
bool adjust_voter_distribution_;

// This is used in tests to reject RequestVote RPC requests.
bool withhold_votes_;

Expand Down

0 comments on commit 7ce9b94

Please sign in to comment.