Skip to content

Commit

Permalink
Merge pull request facebook#100 from anirbanr-fb/better_introspection…
Browse files Browse the repository at this point in the history
…_and_efficiency

Better introspection for rpc service queue
  • Loading branch information
anirbanr-fb authored Mar 30, 2021
2 parents 5ba689a + 3be3858 commit 1e3d558
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 13 deletions.
9 changes: 9 additions & 0 deletions src/kudu/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
request_.committed_index() : kMinimumOpIdIndex;

if (PREDICT_FALSE(!s.ok())) {
// Incrementing failed_attempts_ prevents a RequestForPeer error to continually
// trigger an error on every actual write. The next attempt to RequestForPeer
// will now be restricted to Heartbeats, but because this is hard failure
// it will keep failing but only fail less often.
// TODO -
// Make empty heartbeats go through after a failure to send actual messages,
// without changing the cursor at all on peer, but still maintaining authority on
// it. Otherwise node keeps asking for votes, destabilizing cluster.
failed_attempts_++;
VLOG_WITH_PREFIX_UNLOCKED(1) << s.ToString();
return;
}
Expand Down
13 changes: 13 additions & 0 deletions src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ DEFINE_bool(synchronous_transfer_leadership, false,
"When a transfer leadership call is made, it checks if peer is already "
"caught up and initiates transfer leadership, short circuiting async "
"wait for next response");

// FB - warning - this is disabled in upstream Mysql raft, because automatic
// health management of peers is risky. It also reduces contention on consensus
// queue lock, as it does not have to be reacquired.
DEFINE_bool(update_peer_health_status, true,
"After every request for peer, maintain the health status of the peer "
" This can be used to evict an irrecovarable peer");

TAG_FLAG(synchronous_transfer_leadership, advanced);
DECLARE_bool(enable_flexi_raft);

Expand Down Expand Up @@ -689,7 +697,12 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
// Always trigger a health status update check at the end of this function.
bool wal_catchup_progress = false;
bool wal_catchup_failure = false;
// Preventing the overhead of this as we need to take consensus queue lock
// again
SCOPED_CLEANUP({
if (!FLAGS_update_peer_health_status) {
return;
}
std::lock_guard<simple_spinlock> lock(queue_lock_);
TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
Expand Down
14 changes: 10 additions & 4 deletions src/kudu/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,23 @@ Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {

string InboundCall::ToString() const {
if (header_.has_request_id()) {
return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4})",
return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4}) recv: $5 handled: $6 comp: $7",
remote_method_.ToString(),
conn_->remote().ToString(),
header_.request_id().client_id(),
header_.request_id().seq_no(),
header_.request_id().attempt_no());
header_.request_id().attempt_no(),
timing_.time_received.ToString(),
(timing_.time_handled.Initialized() ? timing_.time_handled.ToString() : "NOT_HANDLED"),
(timing_.time_completed.Initialized() ? timing_.time_completed.ToString() : "NOT_COMPLETED"));
}
return Substitute("Call $0 from $1 (request call id $2)",
return Substitute("Call $0 from $1 (request call id $2) recv: $3 handled: $4 comp: $5",
remote_method_.ToString(),
conn_->remote().ToString(),
header_.call_id());
header_.call_id(),
timing_.time_received.ToString(),
(timing_.time_handled.Initialized() ? timing_.time_handled.ToString() : "NOT_HANDLED"),
(timing_.time_completed.Initialized() ? timing_.time_completed.ToString() : "NOT_COMPLETED"));
}

void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
Expand Down
20 changes: 17 additions & 3 deletions src/kudu/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ ServicePool::ServicePool(gscoped_ptr<ServiceIf> service,
incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
closing_(false) {
closing_(false),
logged_busy_(false) {
}

ServicePool::~ServicePool() {
Expand Down Expand Up @@ -129,14 +130,24 @@ void ServicePool::RejectTooBusy(InboundCall* c) {
KLOG_EVERY_N_SECS(WARNING, 300) << err_msg;
c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
Status::ServiceUnavailable(err_msg));
DLOG(INFO) << err_msg << " Contents of service queue:\n"
<< service_queue_.ToString();

if (!logged_busy_.load(std::memory_order_acquire)) {
// throttle, as don't want to flood log with lines if
// pool is always at edge of queue.
KLOG_EVERY_N_SECS(WARNING, 600) << err_msg << " Contents of service queue:\n"
<< service_queue_.ToString();
logged_busy_ = true;
}

if (too_busy_hook_) {
too_busy_hook_();
}
}

std::string ServicePool::RpcServiceQueueToString() const {
return service_queue_.ToString();
}

RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) {
return service_->LookupMethod(method);
}
Expand Down Expand Up @@ -173,6 +184,9 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
RejectTooBusy(*evicted);
}

// success in enqueu. Clear the printed state for busy
logged_busy_.store(false, std::memory_order_release);

if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
// NB: do not do anything with 'c' after it is successfully queued --
// a service thread may have already dequeued it, processed it, and
Expand Down
7 changes: 7 additions & 0 deletions src/kudu/rpc/service_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef KUDU_SERVICE_POOL_H
#define KUDU_SERVICE_POOL_H

#include <atomic>
#include <cstddef>
#include <functional>
#include <string>
Expand Down Expand Up @@ -92,6 +93,11 @@ class ServicePool : public RpcService {

const std::string service_name() const;

/**
* Dump the current contents of the service queue
*/
std::string RpcServiceQueueToString() const;

private:
void RunThread();
void RejectTooBusy(InboundCall* c);
Expand All @@ -107,6 +113,7 @@ class ServicePool : public RpcService {
bool closing_;

std::function<void(void)> too_busy_hook_;
std::atomic<bool> logged_busy_;

DISALLOW_COPY_AND_ASSIGN(ServicePool);
};
Expand Down
17 changes: 11 additions & 6 deletions src/kudu/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
#ifdef FB_DO_NOT_REMOVE
#include "kudu/cfile/block_cache.h"
#endif
#include "kudu/consensus/consensus.service.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
#ifdef FB_DO_NOT_REMOVE
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/scanners.h"
Expand Down Expand Up @@ -155,21 +157,15 @@ Status TabletServer::Start() {

gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
#endif

#ifdef FB_DO_NOT_REMOVE
gscoped_ptr<ServiceIf> tablet_copy_service(new TabletCopyServiceImpl(
this, tablet_manager_.get()));

RETURN_NOT_OK(RegisterService(std::move(ts_service)));
RETURN_NOT_OK(RegisterService(std::move(admin_service)));
#endif

#ifdef FB_DO_NOT_REMOVE
RETURN_NOT_OK(RegisterService(std::move(tablet_copy_service)));
#endif

#ifdef FB_DO_NOT_REMOVE
RETURN_NOT_OK(heartbeater_->Start());
RETURN_NOT_OK(maintenance_manager_->Start());
#endif
Expand Down Expand Up @@ -208,5 +204,14 @@ void TabletServer::Shutdown() {
}
}

std::string TabletServer::ConsensusServiceRpcQueueToString() const {
const kudu::rpc::ServicePool* pool = rpc_server_->service_pool(
kudu::consensus::ConsensusServiceIf::static_service_name());
if (pool) {
return pool->RpcServiceQueueToString();
}
return "";
}

} // namespace tserver
} // namespace kudu
5 changes: 5 additions & 0 deletions src/kudu/tserver/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class TabletServer : public kserver::KuduServer {

#endif

/*
* Capture a snapshot of the RPC service queue in the server log file.
*/
std::string ConsensusServiceRpcQueueToString() const;

private:
friend class TabletServerTestBase;
friend class TSTabletManager;
Expand Down

0 comments on commit 1e3d558

Please sign in to comment.