Skip to content

Commit

Permalink
Add a counter to check RPC mismatches
Browse files Browse the repository at this point in the history
Summary:

Mismatches will bump the counter even if we're not enforcing.

Proxies are a bit tricky, and currently they bump counters on both the
proxy and the origin. This is because at the consensus_peers layer we
don't differentiate between proxies and origins. I still think it's ok
because we are aiming to get the counter totally to 0. Any value above 1
is bad anyway.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Signed-off-by: Yichen <[email protected]>
  • Loading branch information
yichenshen committed Aug 1, 2022
1 parent 449b069 commit 917e86a
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 29 deletions.
45 changes: 33 additions & 12 deletions src/kudu/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
Expand Down Expand Up @@ -103,6 +104,12 @@ DEFINE_int32(proxy_batch_duration_ms, 0,

DECLARE_bool(raft_enforce_rpc_token);

METRIC_DEFINE_counter(server, raft_rpc_token_num_response_mismatches,
"Reponse RPC token mismatches",
kudu::MetricUnit::kRequests,
"Number of RPC responses that did not have a token "
"that matches this instance's");

using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::Messenger;
using kudu::rpc::PeriodicTimer;
Expand Down Expand Up @@ -625,9 +632,10 @@ void PeerProxyPool::Clear() {
}

template <class RespType>
void CheckAndEnforceResponseToken(const std::string& method_name,
RespType* response,
boost::optional<std::string> rpc_token) {
void CheckAndEnforceResponseToken(
const std::string& method_name, RespType* response,
boost::optional<std::string> rpc_token,
const scoped_refptr<Counter>& mismatch_counter) {
if (!rpc_token && !response->has_raft_rpc_token()) {
// Empty on both, nothing to enforce
return;
Expand All @@ -639,6 +647,8 @@ void CheckAndEnforceResponseToken(const std::string& method_name,
return;
}

mismatch_counter->Increment();

auto error_message = Substitute(
"Raft RPC token mismatch on response. Request token: $0. "
"Response token: $1",
Expand All @@ -665,11 +675,14 @@ void CheckAndEnforceResponseToken(const std::string& method_name,
}

RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
shared_ptr<ConsensusServiceProxy> consensus_proxy)
shared_ptr<ConsensusServiceProxy> consensus_proxy,
scoped_refptr<Counter> num_rpc_token_mismatches)
: hostport_(std::move(hostport)),
consensus_proxy_(std::move(consensus_proxy)) {
consensus_proxy_(std::move(consensus_proxy)),
num_rpc_token_mismatches_(std::move(num_rpc_token_mismatches)) {
DCHECK(hostport_ != NULL);
DCHECK(consensus_proxy_ != NULL);
DCHECK(num_rpc_token_mismatches_ != nullptr);
}

void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
Expand All @@ -683,10 +696,12 @@ void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
: boost::optional<std::string>();
consensus_proxy_->UpdateConsensusAsync(
*request, response, controller,
[callback, response, request_token = std::move(rpc_token)]() {
[callback, response, request_token = std::move(rpc_token),
mismatch_counter = num_rpc_token_mismatches_]() {
// Should not need to lock here since only one request can happen at any
// time
CheckAndEnforceResponseToken("UpdateAsync", response, request_token);
CheckAndEnforceResponseToken("UpdateAsync", response, request_token,
mismatch_counter);
callback();
});
}
Expand All @@ -707,9 +722,10 @@ void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
: boost::optional<std::string>();
consensus_proxy_->RequestConsensusVoteAsync(
*request, response, controller,
[callback, response, request_token = std::move(rpc_token)]() {
[callback, response, request_token = std::move(rpc_token),
mismatch_counter = num_rpc_token_mismatches_]() {
CheckAndEnforceResponseToken("RequestConsensusVoteAsync", response,
request_token);
request_token, mismatch_counter);
callback();
});
}
Expand Down Expand Up @@ -746,16 +762,21 @@ Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>& messenger

} // anonymous namespace

RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger)
: messenger_(std::move(messenger)) {}
RpcPeerProxyFactory::RpcPeerProxyFactory(
shared_ptr<Messenger> messenger,
const scoped_refptr<MetricEntity>& metric_entity)
: messenger_(std::move(messenger)),
num_rpc_token_mismatches_(metric_entity->FindOrCreateCounter(
&METRIC_raft_rpc_token_num_response_mismatches)) {}

Status RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb,
shared_ptr<PeerProxy>* proxy) {
gscoped_ptr<HostPort> hostport(new HostPort);
RETURN_NOT_OK(HostPortFromPB(peer_pb.last_known_addr(), hostport.get()));
shared_ptr<ConsensusServiceProxy> new_proxy;
RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger_, *hostport, &new_proxy));
proxy->reset(new RpcPeerProxy(std::move(hostport), std::move(new_proxy)));
proxy->reset(new RpcPeerProxy(std::move(hostport), std::move(new_proxy),
num_rpc_token_mismatches_));
return Status::OK();
}

Expand Down
22 changes: 15 additions & 7 deletions src/kudu/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"

Expand Down Expand Up @@ -295,7 +296,8 @@ class PeerProxyPool {
class RpcPeerProxy : public PeerProxy {
public:
RpcPeerProxy(gscoped_ptr<HostPort> hostport,
std::shared_ptr<ConsensusServiceProxy> consensus_proxy);
std::shared_ptr<ConsensusServiceProxy> consensus_proxy,
scoped_refptr<Counter> num_rpc_token_mismatches);

void UpdateAsync(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
Expand Down Expand Up @@ -323,24 +325,30 @@ class RpcPeerProxy : public PeerProxy {
private:
gscoped_ptr<HostPort> hostport_;
std::shared_ptr<ConsensusServiceProxy> consensus_proxy_;

scoped_refptr<Counter> num_rpc_token_mismatches_;
};

// PeerProxyFactory implementation that generates RPCPeerProxies
class RpcPeerProxyFactory : public PeerProxyFactory {
public:
explicit RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger);
explicit RpcPeerProxyFactory(
std::shared_ptr<rpc::Messenger> messenger,
const scoped_refptr<MetricEntity>& metric_entity);

Status NewProxy(const RaftPeerPB& peer_pb,
std::shared_ptr<PeerProxy>* proxy) override;
Status NewProxy(const RaftPeerPB &peer_pb,
std::shared_ptr<PeerProxy> *proxy) override;

~RpcPeerProxyFactory();
~RpcPeerProxyFactory();

const std::shared_ptr<rpc::Messenger>& messenger() const override {
return messenger_;
const std::shared_ptr<rpc::Messenger> &messenger() const override {
return messenger_;
}

private:
std::shared_ptr<rpc::Messenger> messenger_;

scoped_refptr<Counter> num_rpc_token_mismatches_;
};

// Query the consensus service at last known host/port that is
Expand Down
29 changes: 20 additions & 9 deletions src/kudu/tserver/consensus_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;

METRIC_DEFINE_counter(server, raft_rpc_token_num_request_mismatches,
"Request RPC token mismatches",
kudu::MetricUnit::kRequests,
"Number of RPC request that did not have a token "
"that matches this instance's");

namespace kudu {

namespace tserver {
Expand Down Expand Up @@ -215,10 +221,11 @@ bool GetConsensusOrRespond(TSTabletManager* tablet_manager,
}

template <class ReqType, class RespType>
bool CheckRaftRpcTokenOrRespond(const std::string &method_name,
const ReqType *req, RespType resp,
rpc::RpcContext *context,
const consensus::RaftConsensus &consensus) {
bool CheckRaftRpcTokenOrRespond(const std::string& method_name,
const ReqType* req, RespType resp,
rpc::RpcContext* context,
const consensus::RaftConsensus& consensus,
const scoped_refptr<Counter>& mismatch_counter) {
const auto &ownToken = consensus.GetRaftRpcToken();
if (!ownToken && !req->has_raft_rpc_token()) {
// Empty on both, nothing to enforce
Expand All @@ -231,6 +238,8 @@ bool CheckRaftRpcTokenOrRespond(const std::string &method_name,
return true;
}

mismatch_counter->Increment();

auto error_message = Substitute(
"Raft RPC token mismatch. Receiver token: $0. Request token: $1",
ownToken ? *ownToken : "<null>",
Expand Down Expand Up @@ -299,8 +308,10 @@ void HandleErrorResponse(const ReqType* req, RespType* resp, RpcContext* context
ConsensusServiceImpl::ConsensusServiceImpl(ServerBase* server,
TSTabletManager* tablet_manager)
: ConsensusServiceIf(server->metric_entity(), server->result_tracker()),
server_(server),
tablet_manager_(tablet_manager) {
server_(server), tablet_manager_(tablet_manager),
request_rpc_token_mismatches_(
server->metric_entity()->FindOrCreateCounter(
&METRIC_raft_rpc_token_num_request_mismatches)) {
}

ConsensusServiceImpl::~ConsensusServiceImpl() {
Expand Down Expand Up @@ -331,7 +342,7 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
}

if (!CheckRaftRpcTokenOrRespond("UpdateConsensus", req, resp, context,
*consensus)) {
*consensus, request_rpc_token_mismatches_)) {
return;
}

Expand Down Expand Up @@ -376,7 +387,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
}

if (!CheckRaftRpcTokenOrRespond("RequestConsensusVote", req, resp, context,
*consensus)) {
*consensus, request_rpc_token_mismatches_)) {
return;
}

Expand Down Expand Up @@ -491,7 +502,7 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
if (!GetConsensusOrRespond(tablet_manager_, resp, context, &consensus)) return;

if (!CheckRaftRpcTokenOrRespond("RunLeaderElection", req, resp, context,
*consensus)) {
*consensus, request_rpc_token_mismatches_)) {
return;
}

Expand Down
2 changes: 2 additions & 0 deletions src/kudu/tserver/consensus_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class ConsensusServiceImpl : public consensus::ConsensusServiceIf {
private:
server::ServerBase* server_;
TSTabletManager* tablet_manager_;

scoped_refptr<Counter> request_rpc_token_mismatches_;
};

} // namespace tserver
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/tserver/simple_tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ Status TSTabletManager::Start(bool is_first_run) {
gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
scoped_refptr<ITimeManager> time_manager;

peer_proxy_factory.reset(new RpcPeerProxyFactory(server_->messenger()));
peer_proxy_factory.reset(
new RpcPeerProxyFactory(server_->messenger(), server_->metric_entity()));

if (server_->opts().enable_time_manager) {
// THIS IS OBVIOUSLY NOT CORRECT.
Expand Down

0 comments on commit 917e86a

Please sign in to comment.