Skip to content

Commit

Permalink
master: include TS address in log messages
Browse files Browse the repository at this point in the history
When looking at master logs, it's quite annoying to have to translate
back from UUIDs to actual hostnames, since the operator typically wants
to ssh into that node to look at logs, etc.

This patch adds TSDescriptor::ToString() and calls it from all the
points in CatalogManager where log messages refer to an individual
server.

This also adds validation that TS registrations must include at least
one HTTP and one RPC address. This has always been the case but wasn't
verified.

Change-Id: Ic55fa7e818a115de70f9fc6aca12581c3b4779c7
Reviewed-on: http://gerrit.cloudera.org:8080/4131
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
toddlipcon authored and adembo committed Aug 26, 2016
1 parent cf0eb4d commit db6d22d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 50 deletions.
91 changes: 45 additions & 46 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1907,14 +1907,14 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
if (report.has_schema_version() &&
table_lock.data().pb.version() != report.schema_version()) {
if (report.schema_version() > table_lock.data().pb.version()) {
LOG(ERROR) << "TS " << ts_desc->permanent_uuid()
LOG(ERROR) << "TS " << ts_desc->ToString()
<< " has reported a schema version greater than the current one "
<< " for tablet " << tablet->ToString()
<< ". Expected version " << table_lock.data().pb.version()
<< " got " << report.schema_version()
<< " (corruption)";
} else {
LOG(INFO) << "TS " << ts_desc->permanent_uuid()
LOG(INFO) << "TS " << ts_desc->ToString()
<< " does not have the latest schema for tablet " << tablet->ToString()
<< ". Expected version " << table_lock.data().pb.version()
<< " got " << report.schema_version();
Expand All @@ -1931,7 +1931,7 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
DCHECK(!s.ok());
DCHECK_EQ(report.state(), tablet::FAILED);
LOG(WARNING) << "Tablet " << tablet->ToString() << " has failed on TS "
<< ts_desc->permanent_uuid() << ": " << s.ToString();
<< ts_desc->ToString() << ": " << s.ToString();
return Status::OK();
}

Expand Down Expand Up @@ -2031,7 +2031,7 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,

VLOG(2) << "Updating consensus configuration for tablet "
<< final_report->tablet_id()
<< " from config reported by " << ts_desc->permanent_uuid()
<< " from config reported by " << ts_desc->ToString()
<< " to that committed in log index "
<< final_report->committed_consensus_state().config().opid_index()
<< " with leader state from term "
Expand Down Expand Up @@ -2298,7 +2298,7 @@ class RetryingTSRpcTask : public MonitoredTask {
// Runs on a reactor thread, so should not block or do any IO.
void RpcCallback() {
if (!rpc_.status().ok()) {
LOG(WARNING) << "TS " << target_ts_desc_->permanent_uuid() << ": "
LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": "
<< type_name() << " RPC failed for tablet "
<< tablet_id() << ": " << rpc_.status().ToString();
} else if (state() != kStateAborted) {
Expand Down Expand Up @@ -2483,18 +2483,19 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
Status s = StatusFromPB(resp_.error().status());
if (s.IsAlreadyPresent()) {
LOG(INFO) << "CreateTablet RPC for tablet " << tablet_id_
<< " on TS " << permanent_uuid_ << " returned already present: "
<< " on TS " << target_ts_desc_->ToString() << " returned already present: "
<< s.ToString();
MarkComplete();
} else {
LOG(WARNING) << "CreateTablet RPC for tablet " << tablet_id_
<< " on TS " << permanent_uuid_ << " failed: " << s.ToString();
<< " on TS " << target_ts_desc_->ToString() << " failed: " << s.ToString();
}
}
}

virtual bool SendRequest(int attempt) OVERRIDE {
VLOG(1) << "Send create tablet request to " << permanent_uuid_ << ":\n"
VLOG(1) << "Send create tablet request to "
<< target_ts_desc_->ToString() << ":\n"
<< " (attempt " << attempt << "):\n"
<< req_.DebugString();
ts_proxy_->CreateTabletAsync(req_, &resp_, &rpc_,
Expand Down Expand Up @@ -2541,33 +2542,39 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
TabletServerErrorPB::Code code = resp_.error().code();
switch (code) {
case TabletServerErrorPB::TABLET_NOT_FOUND:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
LOG(WARNING) << "TS " << target_ts_desc_->ToString()
<< ": delete failed for tablet " << tablet_id_
<< " because the tablet was not found. No further retry: "
<< status.ToString();
MarkComplete();
break;
case TabletServerErrorPB::CAS_FAILED:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
LOG(WARNING) << "TS " << target_ts_desc_->ToString()
<< ": delete failed for tablet " << tablet_id_
<< " due to a CAS failure. No further retry: " << status.ToString();
MarkComplete();
break;
default:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
LOG(WARNING) << "TS " << target_ts_desc_->ToString()
<< ": delete failed for tablet " << tablet_id_
<< " with error code " << TabletServerErrorPB::Code_Name(code)
<< ": " << status.ToString();
break;
}
} else {
master_->catalog_manager()->NotifyTabletDeleteSuccess(permanent_uuid_, tablet_id_);
if (table_) {
LOG(INFO) << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
LOG(INFO) << "TS " << target_ts_desc_->ToString()
<< ": tablet " << tablet_id_
<< " (table " << table_->ToString() << ") successfully deleted";
} else {
LOG(WARNING) << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
LOG(WARNING) << "TS " << target_ts_desc_->ToString()
<< ": tablet " << tablet_id_
<< " did not belong to a known table, but was successfully deleted";
}
MarkComplete();
VLOG(1) << "TS " << permanent_uuid_ << ": delete complete on tablet " << tablet_id_;
VLOG(1) << "TS " << target_ts_desc_->ToString()
<< ": delete complete on tablet " << tablet_id_;
}
}

Expand All @@ -2581,9 +2588,12 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal_);
}

VLOG(1) << "Send delete tablet request to " << permanent_uuid_
<< " (attempt " << attempt << "):\n"
<< req.DebugString();
LOG(INFO) << Substitute("Sending DeleteTablet($0) for tablet $1 on $2 "
"($3)",
TabletDataState_Name(delete_type_),
tablet_id_,
target_ts_desc_->ToString(),
reason_);
ts_proxy_->DeleteTabletAsync(req, &resp_, &rpc_,
boost::bind(&AsyncDeleteReplica::RpcCallback, this));
return true;
Expand Down Expand Up @@ -2621,9 +2631,6 @@ class AsyncAlterTable : public RetryingTSRpcTask {

private:
virtual string tablet_id() const OVERRIDE { return tablet_->tablet_id(); }
string permanent_uuid() const {
return target_ts_desc_->permanent_uuid();
}

virtual void HandleResponse(int attempt) OVERRIDE {
if (resp_.has_error()) {
Expand All @@ -2634,18 +2641,19 @@ class AsyncAlterTable : public RetryingTSRpcTask {
case TabletServerErrorPB::TABLET_NOT_FOUND:
case TabletServerErrorPB::MISMATCHED_SCHEMA:
case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet "
LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": alter failed for tablet "
<< tablet_->ToString() << " no further retry: " << status.ToString();
MarkComplete();
break;
default:
LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet "
LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": alter failed for tablet "
<< tablet_->ToString() << ": " << status.ToString();
break;
}
} else {
MarkComplete();
VLOG(1) << "TS " << permanent_uuid() << ": alter complete on tablet " << tablet_->ToString();
VLOG(1) << "TS " << target_ts_desc_->ToString()
<< ": alter complete on tablet " << tablet_->ToString();
}

if (state() != kStateComplete) {
Expand All @@ -2657,15 +2665,15 @@ class AsyncAlterTable : public RetryingTSRpcTask {
TableMetadataLock l(tablet_->table().get(), TableMetadataLock::READ);

tserver::AlterSchemaRequestPB req;
req.set_dest_uuid(permanent_uuid());
req.set_dest_uuid(target_ts_desc_->permanent_uuid());
req.set_tablet_id(tablet_->tablet_id());
req.set_new_table_name(l.data().pb.name());
req.set_schema_version(l.data().pb.version());
req.mutable_schema()->CopyFrom(l.data().pb.schema());

l.Unlock();

VLOG(1) << "Send alter table request to " << permanent_uuid()
VLOG(1) << "Send alter table request to " << target_ts_desc_->ToString()
<< " (attempt " << attempt << "):\n"
<< req.DebugString();
ts_proxy_->AlterSchemaAsync(req, &resp_, &rpc_,
Expand Down Expand Up @@ -2716,9 +2724,11 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
virtual string type_name() const OVERRIDE { return "AddServer ChangeConfig"; }

virtual string description() const OVERRIDE {
return Substitute("AddServer ChangeConfig RPC for tablet $0 on peer $1 "
return Substitute("AddServer ChangeConfig RPC for tablet $0 on TS $1 "
"with cas_config_opid_index $2",
tablet_->tablet_id(), permanent_uuid(), cstate_.config().opid_index());
tablet_->tablet_id(),
target_ts_desc_->ToString(),
cstate_.config().opid_index());
}

protected:
Expand All @@ -2727,9 +2737,6 @@ class AsyncAddServerTask : public RetryingTSRpcTask {

private:
virtual string tablet_id() const OVERRIDE { return tablet_->tablet_id(); }
string permanent_uuid() const {
return target_ts_desc_->permanent_uuid();
}

const scoped_refptr<TabletInfo> tablet_;
const ConsensusStatePB cstate_;
Expand Down Expand Up @@ -2768,24 +2775,19 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
return false;
}

req_.set_dest_uuid(permanent_uuid());
req_.set_dest_uuid(target_ts_desc_->permanent_uuid());
req_.set_tablet_id(tablet_->tablet_id());
req_.set_type(consensus::ADD_SERVER);
req_.set_cas_config_opid_index(cstate_.config().opid_index());
RaftPeerPB* peer = req_.mutable_server();
peer->set_permanent_uuid(replacement_replica->permanent_uuid());
ServerRegistrationPB peer_reg;
replacement_replica->GetRegistration(&peer_reg);
if (peer_reg.rpc_addresses_size() == 0) {
KLOG_EVERY_N(WARNING, 100) << LogPrefix() << "Candidate replacement "
<< replacement_replica->permanent_uuid()
<< " has no registered rpc address: "
<< peer_reg.ShortDebugString();
return false;
}
CHECK_GT(peer_reg.rpc_addresses_size(), 0);
*peer->mutable_last_known_addr() = peer_reg.rpc_addresses(0);
peer->set_member_type(RaftPeerPB::VOTER);
VLOG(1) << "Sending AddServer ChangeConfig request to " << permanent_uuid() << ":\n"
VLOG(1) << "Sending AddServer ChangeConfig request to "
<< target_ts_desc_->ToString() << ":\n"
<< req_.DebugString();
consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_,
boost::bind(&AsyncAddServerTask::RpcCallback, this));
Expand All @@ -2804,13 +2806,15 @@ void AsyncAddServerTask::HandleResponse(int attempt) {
// Do not retry on a CAS error, otherwise retry forever or until cancelled.
switch (resp_.error().code()) {
case TabletServerErrorPB::CAS_FAILED:
LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed with leader " << permanent_uuid()
LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed with leader "\
<< target_ts_desc_->ToString()
<< " due to CAS failure. No further retry: "
<< status.ToString();
MarkFailed();
break;
default:
LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed with leader " << permanent_uuid()
LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed with leader "
<< target_ts_desc_->ToString()
<< " due to error "
<< TabletServerErrorPB::Code_Name(resp_.error().code())
<< ". This operation will be retried. Error detail: "
Expand Down Expand Up @@ -2873,11 +2877,6 @@ void CatalogManager::SendDeleteReplicaRequest(
const scoped_refptr<TableInfo>& table,
const string& ts_uuid,
const string& reason) {
LOG_WITH_PREFIX(INFO) << Substitute("Deleting tablet $0 on peer $1 "
"with delete type $2 ($3)",
tablet_id, ts_uuid,
TabletDataState_Name(delete_type),
reason);
AsyncDeleteReplica* call =
new AsyncDeleteReplica(master_, ts_uuid, table,
tablet_id, delete_type, cas_config_opid_index_less_or_equal,
Expand Down
13 changes: 13 additions & 0 deletions src/kudu/master/ts_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ Status TSDescriptor::Register(const NodeInstancePB& instance,
return Status::InvalidArgument(msg);
}

if (registration.rpc_addresses().empty() ||
registration.http_addresses().empty()) {
return Status::InvalidArgument(
"invalid registration: must have at least one RPC and one HTTP address",
registration.ShortDebugString());
}

if (instance.instance_seqno() < latest_seqno_) {
return Status::AlreadyPresent(
strings::Substitute("Cannot register with sequence number $0:"
Expand Down Expand Up @@ -250,5 +257,11 @@ Status TSDescriptor::GetConsensusProxy(const shared_ptr<rpc::Messenger>& messeng
return Status::OK();
}

string TSDescriptor::ToString() const {
std::lock_guard<simple_spinlock> l(lock_);
const auto& addr = registration_->rpc_addresses(0);
return strings::Substitute("$0 ($1:$2)", permanent_uuid_, addr.host(), addr.port());
}

} // namespace master
} // namespace kudu
4 changes: 4 additions & 0 deletions src/kudu/master/ts_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class TSDescriptor {
return num_live_replicas_;
}

// Return a string form of this TS, suitable for printing.
// Includes the UUID as well as last known host/port.
std::string ToString() const;

private:
FRIEND_TEST(TestTSDescriptor, TestReplicaCreationsDecay);

Expand Down
8 changes: 4 additions & 4 deletions src/kudu/master/ts_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
shared_ptr<TSDescriptor> new_desc;
RETURN_NOT_OK(TSDescriptor::RegisterNew(instance, registration, &new_desc));
InsertOrDie(&servers_by_id_, uuid, new_desc);
LOG(INFO) << Substitute("Registered new tserver $0 with Master",
instance.ShortDebugString());
LOG(INFO) << Substitute("Registered new tserver with Master: $0",
new_desc->ToString());
desc->swap(new_desc);
} else {
shared_ptr<TSDescriptor> found(FindOrDie(servers_by_id_, uuid));
RETURN_NOT_OK(found->Register(instance, registration));
LOG(INFO) << Substitute("Re-registered known tserver $0 with Master",
instance.ShortDebugString());
LOG(INFO) << Substitute("Re-registered known tserver with Master: $0",
found->ToString());
desc->swap(found);
}

Expand Down

0 comments on commit db6d22d

Please sign in to comment.