Skip to content

Commit

Permalink
[consensus][proxy][3/n] support for proxying consensus messages (face…
Browse files Browse the repository at this point in the history
…book#42)

This patch adds support for proxying consensus messages.

Currently supported:

* New ConsensusService RPC endpoint to change the proxy topology.
* PROXY_OP being translated to a real op if available and forwarded.
* Multi-hop routing.
* Proxy timed-waiting if proxied op is not yet available.
* Inefficient server-side proxying implementation (blocking).

Tests:

Tests have been stripped from this kuduraft backport. In the unstripped
version of this patch, I added the following tests:

1. Simple test that acts like a leader and sends RPCs to a proxy an
   validates that a downstream node has received the RPC.
2. Same as the above, but also tests multi-hop routing.

TODO:

* Leader knowledge of proxies and the topology.
* TTL support to avoid routing loops.
* Non-blocking request processing.

Change-Id: I1b42aa4777df1858409d6933ae8316e0e36db4ae
  • Loading branch information
mpercy committed May 7, 2020
1 parent 8392129 commit 610761c
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 10 deletions.
36 changes: 35 additions & 1 deletion src/kudu/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ enum OperationType {
// ALTER_SCHEMA_OP = 4;
CHANGE_CONFIG_OP = 5;
WRITE_OP_EXT = 6;
PROXY_OP = 7;
}

/*
Expand Down Expand Up @@ -170,6 +171,11 @@ message ChangeConfigRecordPB {
required RaftConfigPB new_config = 3;
}

message ProxyRecordPB {
// The destination server intended to receive this message.
optional bytes dest_server = 1;
}

enum ChangeConfigType {
UNKNOWN_CHANGE = 0;
ADD_PEER = 1;
Expand Down Expand Up @@ -264,6 +270,7 @@ message ReplicateMsg {
//optional tserver.WriteRequestPB write_request = 5;
//optional tserver.AlterSchemaRequestPB alter_schema_request = 6;
optional ChangeConfigRecordPB change_config_record = 7;
optional ProxyRecordPB proxy_record = 10;

// The client's request id for this message, if it is set.
optional rpc.RequestIdPB request_id = 8;
Expand Down Expand Up @@ -399,11 +406,19 @@ message ConsensusRequestPB {
// UUID of server this request is addressed to.
optional bytes dest_uuid = 7;

// UUID of server that will proxy this request to the eventual 'dest_uuid'.
// Must be set if this request is intended to be proxied.
optional bytes proxy_dest_uuid = 12;

required string tablet_id = 1;

// The uuid of the peer making the call.
// UUID of the leader peer making the call.
required bytes caller_uuid = 2;

// UUID of server proxying this request.
// Must be set if this request was proxied on behalf of the leader.
optional bytes proxy_caller_uuid = 13;

// The caller's term. As only leaders can send messages,
// replicas will accept all messages as long as the term
// is equal to or higher than the last term they know about.
Expand Down Expand Up @@ -649,6 +664,22 @@ message UnsafeChangeConfigResponsePB {
optional ServerErrorPB error = 1;
}

message ChangeProxyTopologyRequestPB {
// UUID of server this request is addressed to.
optional bytes dest_uuid = 1;
optional bytes tablet_id = 2;

// Sender identification, it could be a static string as well.
optional bytes caller_id = 3;

// The new proxy topology to use.
optional ProxyTopologyPB new_config = 4;
}

message ChangeProxyTopologyResponsePB {
optional ServerErrorPB error = 1;
}

// A Raft implementation.
service ConsensusService {
option (kudu.rpc.default_authz_method) = "AuthorizeServiceUser";
Expand All @@ -674,6 +705,9 @@ service ConsensusService {
// Implements unsafe config change operation for manual recovery use cases.
rpc UnsafeChangeConfig(UnsafeChangeConfigRequestPB) returns (UnsafeChangeConfigResponsePB);

// Change the routing graph that defines how requests are proxied.
rpc ChangeProxyTopology(ChangeProxyTopologyRequestPB) returns (ChangeProxyTopologyResponsePB);

rpc GetNodeInstance(GetNodeInstanceRequestPB) returns (GetNodeInstanceResponsePB);

// Force this node to run a leader election.
Expand Down
9 changes: 8 additions & 1 deletion src/kudu/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/threading/thread_collision_warner.h"
Expand All @@ -65,7 +66,6 @@ class ConsensusRequestPB;
class ConsensusResponsePB;
class ConsensusStatusPB;
class PeerMessageQueueObserver;
class TimeManager;
#ifdef FB_DO_NOT_REMOVE
class StartTabletCopyRequestPB;
#endif
Expand Down Expand Up @@ -376,6 +376,13 @@ class PeerMessageQueue {
const std::function<bool(const kudu::consensus::RaftPeerPB&)>& filter_fn);
void EndWatchForSuccessor();

// TODO(mpercy): It's probably not safe in general to access a queue's log
// cache via bare pointer, since (IIRC) a queue will be reconstructed
// transitioning to/from leader. Check this.
LogCache* log_cache() {
return &log_cache_;
}

private:
FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward);
Expand Down
Loading

0 comments on commit 610761c

Please sign in to comment.