Skip to content

Commit

Permalink
KUDU-2274. Shut down tombstoned replica when replacing it
Browse files Browse the repository at this point in the history
Failing to shut down a tombstoned replica after copying it can lead to
unfortunate interleavings resulting in the replica ending up in an
inconsistent state. This actually occurred in a test environment,
although it proved very hard to reproduce.

This patch includes several changes in addition to shutting down
tombstoned replicas before replacing them:

* Remove the thread safety properties of the ConsensusMetadata class

  ConsensusMetadata doesn't need to be thread-safe, even though it is
  ref-counted, because it is required to be externally synchronized.
  This patch replaces the mutex with a DFAKE_MUTEX from the thread
  collision warner utility class in order to easily detect concurrent
  access due to buggy external sychronization.

* Also improve destructor state checks in TabletReplica.

* Fix another case of unlocked cmeta access by TSTabletManager.

These fixes were verified by running tombstoned_voting-stress-test with
4 CPU stress threads on the dist-test cluster after applying only the
ConsensusMetadata thread-safety portion of this patch, and then again
with the unlocked access fix and shutdown portions of this patch.

After removing the cmeta mutex only (186/200 failed):
http://dist-test.cloudera.org/job?job_id=mpercy.1518077234.135005

This full patch (200/200 succeeded):
http://dist-test.cloudera.org/job?job_id=mpercy.1518078690.66599

Change-Id: Ia8d086c3fba52826ebe0d3a44842d53ecb6a9265
Reviewed-on: http://gerrit.cloudera.org:8080/9246
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
mpercy committed Feb 12, 2018
1 parent d977d1c commit 17f9753
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 155 deletions.
186 changes: 65 additions & 121 deletions src/kudu/consensus/consensus_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,216 +53,165 @@ using std::string;
using strings::Substitute;

int64_t ConsensusMetadata::current_term() const {
lock_guard<Mutex> l(lock_);
return current_term_unlocked();
}

int64_t ConsensusMetadata::current_term_unlocked() const {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK(pb_.has_current_term());
return pb_.current_term();
}

void ConsensusMetadata::set_current_term(int64_t term) {
lock_guard<Mutex> l(lock_);
set_current_term_unlocked(term);
}

void ConsensusMetadata::set_current_term_unlocked(int64_t term) {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK_GE(term, kMinimumTerm);
pb_.set_current_term(term);
}

bool ConsensusMetadata::has_voted_for() const {
lock_guard<Mutex> l(lock_);
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return pb_.has_voted_for();
}

string ConsensusMetadata::voted_for() const {
lock_guard<Mutex> l(lock_);
const string& ConsensusMetadata::voted_for() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK(pb_.has_voted_for());
return pb_.voted_for();
}

void ConsensusMetadata::clear_voted_for() {
lock_guard<Mutex> l(lock_);
clear_voted_for_unlocked();
}

void ConsensusMetadata::clear_voted_for_unlocked() {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
pb_.clear_voted_for();
}

void ConsensusMetadata::set_voted_for(const string& uuid) {
lock_guard<Mutex> l(lock_);
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK(!uuid.empty());
pb_.set_voted_for(uuid);
}

bool ConsensusMetadata::IsVoterInConfig(const string& uuid,
RaftConfigState type) {
lock_guard<Mutex> l(lock_);
return IsRaftConfigVoter(uuid, config_unlocked(type));
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return IsRaftConfigVoter(uuid, GetConfig(type));
}

bool ConsensusMetadata::IsMemberInConfig(const string& uuid,
RaftConfigState type) {
lock_guard<Mutex> l(lock_);
return IsRaftConfigMember(uuid, config_unlocked(type));
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return IsRaftConfigMember(uuid, GetConfig(type));
}

int ConsensusMetadata::CountVotersInConfig(RaftConfigState type) {
lock_guard<Mutex> l(lock_);
return CountVoters(config_unlocked(type));
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return CountVoters(GetConfig(type));
}

int64_t ConsensusMetadata::GetConfigOpIdIndex(RaftConfigState type) {
lock_guard<Mutex> l(lock_);
return config_unlocked(type).opid_index();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return GetConfig(type).opid_index();
}

RaftConfigPB ConsensusMetadata::CommittedConfig() const {
lock_guard<Mutex> l(lock_);
return committed_config_unlocked();
const RaftConfigPB& ConsensusMetadata::CommittedConfig() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return GetConfig(COMMITTED_CONFIG);
}

const RaftConfigPB& ConsensusMetadata::config_unlocked(RaftConfigState type) const {
const RaftConfigPB& ConsensusMetadata::GetConfig(RaftConfigState type) const {
switch (type) {
case ACTIVE_CONFIG: return active_config_unlocked();
case COMMITTED_CONFIG: return committed_config_unlocked();
case PENDING_CONFIG: return pending_config_unlocked();
case ACTIVE_CONFIG:
if (has_pending_config_) {
return pending_config_;
}
DCHECK(pb_.has_committed_config());
return pb_.committed_config();
case COMMITTED_CONFIG:
DCHECK(pb_.has_committed_config());
return pb_.committed_config();
case PENDING_CONFIG:
CHECK(has_pending_config_) << LogPrefix() << "There is no pending config";
return pending_config_;
default: LOG(FATAL) << "Unknown RaftConfigState type: " << type;
}
}

const RaftConfigPB& ConsensusMetadata::committed_config_unlocked() const {
lock_.AssertAcquired();
DCHECK(pb_.has_committed_config());
return pb_.committed_config();
}

void ConsensusMetadata::set_committed_config(const RaftConfigPB& config) {
lock_guard<Mutex> l(lock_);
set_committed_config_unlocked(config);
}

void ConsensusMetadata::set_committed_config_unlocked(const RaftConfigPB& config) {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
*pb_.mutable_committed_config() = config;
if (!has_pending_config_) {
UpdateActiveRoleUnlocked();
UpdateActiveRole();
}
}

bool ConsensusMetadata::has_pending_config() const {
lock_guard<Mutex> l(lock_);
return has_pending_config_unlocked();
}

bool ConsensusMetadata::has_pending_config_unlocked() const {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return has_pending_config_;
}

RaftConfigPB ConsensusMetadata::PendingConfig() const {
lock_guard<Mutex> l(lock_);
return pending_config_unlocked();
}

const RaftConfigPB& ConsensusMetadata::pending_config_unlocked() const {
lock_.AssertAcquired();
CHECK(has_pending_config_) << LogPrefix() << "There is no pending config";
return pending_config_;
const RaftConfigPB& ConsensusMetadata::PendingConfig() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return GetConfig(PENDING_CONFIG);;
}

void ConsensusMetadata::clear_pending_config() {
lock_guard<Mutex> l(lock_);
clear_pending_config_unlocked();
}

void ConsensusMetadata::clear_pending_config_unlocked() {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
has_pending_config_ = false;
pending_config_.Clear();
UpdateActiveRoleUnlocked();
UpdateActiveRole();
}

void ConsensusMetadata::set_pending_config(const RaftConfigPB& config) {
lock_guard<Mutex> l(lock_);
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
has_pending_config_ = true;
pending_config_ = config;
UpdateActiveRoleUnlocked();
UpdateActiveRole();
}

RaftConfigPB ConsensusMetadata::ActiveConfig() const {
lock_guard<Mutex> l(lock_);
return active_config_unlocked();
const RaftConfigPB& ConsensusMetadata::ActiveConfig() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return GetConfig(ACTIVE_CONFIG);
}

const RaftConfigPB& ConsensusMetadata::active_config_unlocked() const {
lock_.AssertAcquired();
if (has_pending_config_) {
return pending_config_unlocked();
}
return committed_config_unlocked();
}

string ConsensusMetadata::leader_uuid() const {
lock_guard<Mutex> l(lock_);
const string& ConsensusMetadata::leader_uuid() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return leader_uuid_;
}

void ConsensusMetadata::set_leader_uuid(const string& uuid) {
lock_guard<Mutex> l(lock_);
set_leader_uuid_unlocked(uuid);
}

void ConsensusMetadata::set_leader_uuid_unlocked(const string& uuid) {
lock_.AssertAcquired();
leader_uuid_ = uuid;
UpdateActiveRoleUnlocked();
void ConsensusMetadata::set_leader_uuid(string uuid) {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
leader_uuid_ = std::move(uuid);
UpdateActiveRole();
}

RaftPeerPB::Role ConsensusMetadata::active_role() const {
lock_guard<Mutex> l(lock_);
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
return active_role_;
}

ConsensusStatePB ConsensusMetadata::ToConsensusStatePB() const {
lock_guard<Mutex> l(lock_);
return ToConsensusStatePBUnlocked();
}

ConsensusStatePB ConsensusMetadata::ToConsensusStatePBUnlocked() const {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
ConsensusStatePB cstate;
cstate.set_current_term(pb_.current_term());
if (!leader_uuid_.empty()) {
cstate.set_leader_uuid(leader_uuid_);
}
*cstate.mutable_committed_config() = committed_config_unlocked();
if (has_pending_config_unlocked()) {
*cstate.mutable_pending_config() = pending_config_unlocked();
*cstate.mutable_committed_config() = CommittedConfig();
if (has_pending_config_) {
*cstate.mutable_pending_config() = pending_config_;
}
return cstate;
}

void ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& cstate) {
lock_guard<Mutex> l(lock_);
if (cstate.current_term() > current_term_unlocked()) {
set_current_term_unlocked(cstate.current_term());
clear_voted_for_unlocked();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
if (cstate.current_term() > current_term()) {
set_current_term(cstate.current_term());
clear_voted_for();
}

set_leader_uuid_unlocked("");
set_committed_config_unlocked(cstate.committed_config());
clear_pending_config_unlocked();
set_leader_uuid("");
set_committed_config(cstate.committed_config());
clear_pending_config();
}

Status ConsensusMetadata::Flush(FlushMode flush_mode) {
lock_guard<Mutex> l(lock_);
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
MAYBE_FAULT(FLAGS_fault_crash_before_cmeta_flush);
SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing consensus metadata");

Expand Down Expand Up @@ -364,13 +313,8 @@ std::string ConsensusMetadata::LogPrefix() const {
}

void ConsensusMetadata::UpdateActiveRole() {
lock_guard<Mutex> l(lock_);
UpdateActiveRoleUnlocked();
}

void ConsensusMetadata::UpdateActiveRoleUnlocked() {
lock_.AssertAcquired();
ConsensusStatePB cstate = ToConsensusStatePBUnlocked();
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
ConsensusStatePB cstate = ToConsensusStatePB();
active_role_ = GetConsensusRole(peer_uuid_, cstate);
VLOG_WITH_PREFIX(1) << "Updating active role to " << RaftPeerPB::Role_Name(active_role_)
<< ". Consensus state: " << pb_util::SecureShortDebugString(cstate);
Expand Down
Loading

0 comments on commit 17f9753

Please sign in to comment.