Skip to content

Commit

Permalink
Merge pull request facebook#37 from anirbanr-fb/master_ban
Browse files Browse the repository at this point in the history
Master Ban support
  • Loading branch information
anirbanr-fb authored Mar 11, 2020
2 parents a7fca78 + d9b85e3 commit 258741e
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 8 deletions.
10 changes: 9 additions & 1 deletion src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,18 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
}

void PeerMessageQueue::BeginWatchForSuccessor(
const boost::optional<string>& successor_uuid) {
const boost::optional<string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn) {
std::lock_guard<simple_spinlock> l(queue_lock_);
successor_watch_in_progress_ = true;
designated_successor_uuid_ = successor_uuid;
tl_filter_fn_ = filter_fn;
}

void PeerMessageQueue::EndWatchForSuccessor() {
std::lock_guard<simple_spinlock> l(queue_lock_);
successor_watch_in_progress_ = false;
tl_filter_fn_ = nullptr;
}

void PeerMessageQueue::UpdateFollowerWatermarks(int64_t committed_index,
Expand Down Expand Up @@ -1071,6 +1074,11 @@ void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
return;
}

// check if this instance is filtered, if filter_fn has been provided
if (!designated_successor_uuid_ && tl_filter_fn_ && tl_filter_fn_(*peer_pb)) {
return;
}

bool peer_caught_up =
!OpIdEquals(status.last_received_current_leader(), MinimumOpId()) &&
OpIdEquals(status.last_received_current_leader(), queue_state_.last_appended);
Expand Down
4 changes: 3 additions & 1 deletion src/kudu/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ class PeerMessageQueue {
// boost::none, the queue will notify its observers when 'successor_uuid' is
// caught up to the leader. Otherwise, it will notify its observers
// with the UUID of the first voter that is caught up.
void BeginWatchForSuccessor(const boost::optional<std::string>& successor_uuid);
void BeginWatchForSuccessor(const boost::optional<std::string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn);
void EndWatchForSuccessor();

private:
Expand Down Expand Up @@ -572,6 +573,7 @@ class PeerMessageQueue {
bool successor_watch_in_progress_;
boost::optional<std::string> designated_successor_uuid_;

std::function<bool(const kudu::consensus::RaftPeerPB&)> tl_filter_fn_;
// We assume that we never have multiple threads racing to append to the queue.
// This fake mutex adds some extra assurance that this implementation property
// doesn't change.
Expand Down
12 changes: 12 additions & 0 deletions src/kudu/consensus/quorum_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config) {
return false;
}

bool GetRaftConfigMemberRegion(const std::string& uuid,
const RaftConfigPB& config, bool *is_voter, std::string *region) {
for (const RaftPeerPB& peer : config.peers()) {
if (peer.permanent_uuid() == uuid) {
*is_voter = (peer.member_type() == RaftPeerPB::VOTER);
*region = peer.attrs().region();
return true;
}
}
return false;
}

bool IsVoterRole(RaftPeerPB::Role role) {
return role == RaftPeerPB::LEADER || role == RaftPeerPB::FOLLOWER;
}
Expand Down
2 changes: 2 additions & 0 deletions src/kudu/consensus/quorum_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ enum class MajorityHealthPolicy {

bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config);
bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config);
bool GetRaftConfigMemberRegion(const std::string& uuid, const RaftConfigPB& config,
bool *is_voter, std::string *region);

// Whether the specified Raft role is attributed to a peer which can participate
// in leader elections.
Expand Down
26 changes: 21 additions & 5 deletions src/kudu/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ DEFINE_double(leader_failure_max_missed_heartbeat_periods, 3.0,
"The value passed to this flag may be fractional.");
TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced);

DEFINE_double(snooze_for_leader_ban_ratio, 1.0,
"Failure detector for this instance should be at a higher ratio than other instances"
". This will prevent this instance from initiating an election");
TAG_FLAG(snooze_for_leader_ban_ratio, advanced);

DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000,
"Maximum time to sleep in between leader election retries, in addition to the "
"regular timeout. When leader election fails the interval in between retries "
Expand Down Expand Up @@ -603,7 +608,8 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
}

Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_leader_uuid,
LeaderStepDownResponsePB* resp) {
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership");
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
Expand Down Expand Up @@ -636,19 +642,20 @@ Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_lead
return Status::InvalidArgument(msg);
}
}
return BeginLeaderTransferPeriodUnlocked(new_leader_uuid);
return BeginLeaderTransferPeriodUnlocked(new_leader_uuid, filter_fn);
}

Status RaftConsensus::BeginLeaderTransferPeriodUnlocked(
const boost::optional<string>& successor_uuid) {
const boost::optional<string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn) {
DCHECK(lock_.is_locked());
if (leader_transfer_in_progress_.CompareAndSwap(false, true)) {
return Status::ServiceUnavailable(
Substitute("leadership transfer for $0 already in progress",
options_.tablet_id));
}
leader_transfer_in_progress_.Store(true, kMemOrderAcquire);
queue_->BeginWatchForSuccessor(successor_uuid);
queue_->BeginWatchForSuccessor(successor_uuid, filter_fn);
transfer_period_timer_->Start();
return Status::OK();
}
Expand Down Expand Up @@ -1459,7 +1466,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// Snooze the failure detector as soon as we decide to accept the message.
// We are guaranteed to be acting as a FOLLOWER at this point by the above
// sanity check.
SnoozeFailureDetector();
// If this particular instance is banned from cluster manager,
// then we snooze for longer to give other instances an opportunity to win
// the election
SnoozeFailureDetector(boost::none, MinimumElectionTimeoutWithBan());

last_leader_communication_time_micros_ = GetMonoTimeMicros();

Expand Down Expand Up @@ -2951,6 +2961,12 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const {
return MonoDelta::FromMilliseconds(failure_timeout);
}

MonoDelta RaftConsensus::MinimumElectionTimeoutWithBan() const {
int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
FLAGS_raft_heartbeat_interval_ms * FLAGS_snooze_for_leader_ban_ratio;
return MonoDelta::FromMilliseconds(failure_timeout);
}

MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
DCHECK(lock_.is_locked());
// Compute a backoff factor based on how many leader elections have
Expand Down
7 changes: 6 additions & 1 deletion src/kudu/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,16 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// success.
// Additional calls to this method during the transfer period prolong it.
Status TransferLeadership(const boost::optional<std::string>& new_leader_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn,
LeaderStepDownResponsePB* resp);

// Begin or end a leadership transfer period. During a transfer period, a
// leader will not accept writes or config changes, but will continue updating
// followers. If a leader transfer period is already in progress,
// BeginLeaderTransferPeriodUnlocked returns ServiceUnavailable.
Status BeginLeaderTransferPeriodUnlocked(
const boost::optional<std::string>& successor_uuid);
const boost::optional<std::string>& successor_uuid,
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn);
void EndLeaderTransferPeriod();

// Creates a new ConsensusRound, the entity that owns all the data
Expand Down Expand Up @@ -379,6 +381,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// jitter, election timeouts may be longer than this.
MonoDelta MinimumElectionTimeout() const;

// Return the minimum election timeout considering ban-factor
MonoDelta MinimumElectionTimeoutWithBan() const;

// Returns a copy of the state of the consensus system.
// If 'report_health' is set to 'INCLUDE_HEALTH_REPORT', and if the
// local replica believes it is the leader of the config, it will include a
Expand Down
8 changes: 8 additions & 0 deletions src/kudu/tserver/simple_tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,22 @@ Status TSTabletManager::CreateDistributedConfig(const TabletServerOptions& optio
new_config.set_obsolete_local(false);
new_config.set_opid_index(consensus::kInvalidOpIdIndex);

size_t ts_index = 0;
// Build the set of followers from our server options.
for (const HostPort& host_port : options.tserver_addresses) {
RaftPeerPB peer;
HostPortPB peer_host_port_pb;
RETURN_NOT_OK(HostPortToPB(host_port, &peer_host_port_pb));
peer.mutable_last_known_addr()->CopyFrom(peer_host_port_pb);
peer.set_member_type(RaftPeerPB::VOTER);
if (!options.tserver_bbd.empty()) {
peer.mutable_attrs()->set_backing_db_present(options.tserver_bbd[ts_index]);
}
if (!options.tserver_regions.empty()) {
peer.mutable_attrs()->set_region(options.tserver_regions[ts_index]);
}
new_config.add_peers()->CopyFrom(peer);
ts_index++;
}

// Now resolve UUIDs.
Expand Down
38 changes: 38 additions & 0 deletions src/kudu/tserver/tablet_server_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,25 @@
#include "kudu/tserver/tablet_server.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/status.h"
#include <boost/algorithm/string.hpp>

// TODO - iRitwik ( please refine these mechanisms to a standard way of
// passing all the properties of an instance )
DEFINE_string(tserver_addresses, "",
"Comma-separated list of the RPC addresses belonging to all "
"instances in this cluster. "
"NOTE: if not specified, configures a non-replicated Master.");
TAG_FLAG(tserver_addresses, stable);

DEFINE_string(tserver_regions, "",
"Comma-separated list of regions which is parallel to tserver_addresses.");
TAG_FLAG(tserver_regions, stable);

DEFINE_string(tserver_bbd, "",
"Comma-separated list of bool strings to specify Backed by "
"Database(non-witness). Runs parallel to tserver_addresses.");
TAG_FLAG(tserver_bbd, stable);

namespace kudu {
namespace tserver {

Expand Down Expand Up @@ -69,6 +81,32 @@ TabletServerOptions::TabletServerOptions() {
" of any one tserver. It is recommended to use at least 3 tservers.";
}
}

if (!FLAGS_tserver_regions.empty()) {
boost::split(tserver_regions, FLAGS_tserver_regions, boost::is_any_of(","));
if (tserver_regions.size() != tserver_addresses.size()) {
LOG(FATAL) << "The number of tserver regions has to be same as tservers: "
<< FLAGS_tserver_regions << " " << FLAGS_tserver_addresses;
}
}
if (!FLAGS_tserver_bbd.empty()) {
std::vector<std::string> bbds;
boost::split(bbds, FLAGS_tserver_bbd, boost::is_any_of(","));
if (bbds.size() != tserver_addresses.size()) {
LOG(FATAL) << "The number of tserver bbd tags has to be same as tservers: "
<< FLAGS_tserver_bbd << " " << FLAGS_tserver_addresses;
}
for (auto tsbbd: bbds) {
if (tsbbd == "true") {
tserver_bbd.push_back(true);
} else if (tsbbd == "false") {
tserver_bbd.push_back(false);
} else {
LOG(FATAL) << "tserver bbd tags has to be bool true|false : "
<< FLAGS_tserver_bbd;
}
}
}
}

bool TabletServerOptions::IsDistributed() const {
Expand Down
2 changes: 2 additions & 0 deletions src/kudu/tserver/tablet_server_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct TabletServerOptions : public kudu::server::ServerBaseOptions {
TabletServerOptions();

std::vector<HostPort> tserver_addresses;
std::vector<std::string> tserver_regions;
std::vector<bool> tserver_bbd;

std::shared_ptr<kudu::log::LogFactory> log_factory;

Expand Down

0 comments on commit 258741e

Please sign in to comment.