Skip to content

Commit

Permalink
Add ServerClock RPC
Browse files Browse the repository at this point in the history
The ServerClock RPC requests the current timestamp of the master or
tablet server.

Change-Id: I4fc66f92408868fdb6dc9e16b1567065026216c2
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/6848
Tested-by: jenkins
Reviewed-by: David Alves <[email protected]>
  • Loading branch information
danburkert committed Jun 4, 2015
1 parent 8095f98 commit 15bc887
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 14 deletions.
6 changes: 4 additions & 2 deletions src/kudu/integration-tests/cluster_itest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include <glog/stl_logging.h>

#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/opid_util.h"
Expand All @@ -18,6 +18,7 @@
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/master/master.proxy.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tserver/tablet_server_test_util.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
Expand Down Expand Up @@ -225,7 +226,8 @@ Status CreateTabletServerMap(MasterServiceProxy* master_proxy,
messenger,
&peer->tserver_proxy,
&peer->tserver_admin_proxy,
&peer->consensus_proxy);
&peer->consensus_proxy,
&peer->generic_proxy);

InsertOrDie(ts_map, peer->instance_id.permanent_uuid(), peer.get());
ignore_result(peer.release());
Expand Down
6 changes: 6 additions & 0 deletions src/kudu/integration-tests/cluster_itest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/master/master.pb.h"
#include "kudu/server/server_base.pb.h"

namespace kudu {
class MonoDelta;
Expand Down Expand Up @@ -53,6 +54,10 @@ class TabletServerAdminServiceProxy;
class TabletServerServiceProxy;
}

namespace server {
class GenericServiceProxy;
}

namespace itest {

struct TServerDetails {
Expand All @@ -61,6 +66,7 @@ struct TServerDetails {
gscoped_ptr<tserver::TabletServerServiceProxy> tserver_proxy;
gscoped_ptr<tserver::TabletServerAdminServiceProxy> tserver_admin_proxy;
gscoped_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
gscoped_ptr<server::GenericServiceProxy> generic_proxy;

// Convenience function to get the UUID from the instance_id struct.
const std::string& uuid() const;
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include <tr1/memory>

#include "kudu/client/client.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
Expand All @@ -18,12 +18,13 @@
#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
#include "kudu/rpc/messenger.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/test_util.h"

DECLARE_bool(enable_leader_failure_detection);
Expand Down
8 changes: 8 additions & 0 deletions src/kudu/server/generic_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "kudu/gutil/map-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/clock.h"
#include "kudu/server/server_base.h"
#include "kudu/util/flag_tags.h"

Expand Down Expand Up @@ -94,5 +95,12 @@ void GenericServiceImpl::FlushCoverage(const FlushCoverageRequestPB* req,
rpc->RespondSuccess();
}

void GenericServiceImpl::ServerClock(const ServerClockRequestPB* req,
ServerClockResponsePB* resp,
rpc::RpcContext* rpc) {
resp->set_timestamp(server_->clock()->Now().ToUint64());
rpc->RespondSuccess();
}

} // namespace server
} // namespace kudu
4 changes: 4 additions & 0 deletions src/kudu/server/generic_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class GenericServiceImpl : public GenericServiceIf {
virtual void FlushCoverage(const FlushCoverageRequestPB* req,
FlushCoverageResponsePB* resp,
rpc::RpcContext* rpc) OVERRIDE;

virtual void ServerClock(const ServerClockRequestPB* req,
ServerClockResponsePB* resp,
rpc::RpcContext* rpc) OVERRIDE;
private:
ServerBase* server_;

Expand Down
11 changes: 11 additions & 0 deletions src/kudu/server/server_base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,21 @@ message FlushCoverageResponsePB {
optional bool success = 1;
}

// Requests the server's current timestamp.
message ServerClockRequestPB {
}
message ServerClockResponsePB {
// The current timestamp of the server.
optional fixed64 timestamp = 1;
}

service GenericService {
rpc SetFlag(SetFlagRequestPB)
returns (SetFlagResponsePB);

rpc FlushCoverage(FlushCoverageRequestPB)
returns (FlushCoverageResponsePB);

rpc ServerClock(ServerClockRequestPB)
returns (ServerClockResponsePB);
}
14 changes: 8 additions & 6 deletions src/kudu/tserver/tablet_server-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@
#include <utility>

#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/remote_bootstrap.proxy.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_test_util.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_graph.h"
#include "kudu/util/test_util.h"
#include "kudu/util/metrics.h"
#include "kudu/rpc/messenger.h"

DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
Expand Down Expand Up @@ -153,7 +154,7 @@ class TabletServerTestBase : public KuduTest {
void ResetClientProxies() {
CreateTsClientProxies(mini_server_->bound_rpc_addr(),
client_messenger_,
&proxy_, &admin_proxy_, &consensus_proxy_);
&proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);
}

// Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
Expand Down Expand Up @@ -450,6 +451,7 @@ class TabletServerTestBase : public KuduTest {
gscoped_ptr<TabletServerServiceProxy> proxy_;
gscoped_ptr<TabletServerAdminServiceProxy> admin_proxy_;
gscoped_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
gscoped_ptr<server::GenericServiceProxy> generic_proxy_;

MetricRegistry ts_test_metric_registry_;
scoped_refptr<MetricEntity> ts_test_metric_entity_;
Expand Down
12 changes: 11 additions & 1 deletion src/kudu/tserver/tablet_server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/util/crc.h"
#include "kudu/util/curl_util.h"
Expand Down Expand Up @@ -61,6 +62,15 @@ TEST_F(TabletServerTest, TestPingServer) {
ASSERT_OK(proxy_->Ping(req, &resp, &controller));
}

TEST_F(TabletServerTest, TestServerClock) {
server::ServerClockRequestPB req;
server::ServerClockResponsePB resp;
RpcController controller;

ASSERT_OK(generic_proxy_->ServerClock(req, &resp, &controller));
ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), resp.timestamp());
}

TEST_F(TabletServerTest, TestSetFlags) {
server::GenericServiceProxy proxy(
client_messenger_, mini_server_->bound_rpc_addr());
Expand Down Expand Up @@ -892,7 +902,7 @@ TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) {
// Connect to it.
CreateTsClientProxies(mini_server_->bound_rpc_addr(),
client_messenger_,
&proxy_, &admin_proxy_, &consensus_proxy_);
&proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);

WriteRequestPB req;
req.set_tablet_id(kTabletId);
Expand Down
7 changes: 5 additions & 2 deletions src/kudu/tserver/tablet_server_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "kudu/consensus/consensus.proxy.h"
#include "kudu/rpc/messenger.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"

Expand All @@ -19,10 +20,12 @@ void CreateTsClientProxies(const Sockaddr& addr,
const shared_ptr<Messenger>& messenger,
gscoped_ptr<TabletServerServiceProxy>* proxy,
gscoped_ptr<TabletServerAdminServiceProxy>* admin_proxy,
gscoped_ptr<ConsensusServiceProxy>* consensus_proxy) {
gscoped_ptr<ConsensusServiceProxy>* consensus_proxy,
gscoped_ptr<server::GenericServiceProxy>* generic_proxy) {
proxy->reset(new TabletServerServiceProxy(messenger, addr));
admin_proxy->reset(new TabletServerAdminServiceProxy(messenger, addr));
consensus_proxy->reset(new consensus::ConsensusServiceProxy(messenger, addr));
consensus_proxy->reset(new ConsensusServiceProxy(messenger, addr));
generic_proxy->reset(new server::GenericServiceProxy(messenger, addr));
}

} // namespace tserver
Expand Down
7 changes: 6 additions & 1 deletion src/kudu/tserver/tablet_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ namespace rpc {
class Messenger;
}

namespace server {
class GenericServiceProxy;
}

namespace tserver {
class TabletServerAdminServiceProxy;
class TabletServerServiceProxy;
Expand All @@ -26,7 +30,8 @@ void CreateTsClientProxies(const Sockaddr& addr,
const std::tr1::shared_ptr<rpc::Messenger>& messenger,
gscoped_ptr<TabletServerServiceProxy>* proxy,
gscoped_ptr<TabletServerAdminServiceProxy>* admin_proxy,
gscoped_ptr<consensus::ConsensusServiceProxy>* consensus_proxy);
gscoped_ptr<consensus::ConsensusServiceProxy>* consensus_proxy,
gscoped_ptr<server::GenericServiceProxy>* generic_proxy);

} // namespace tserver
} // namespace kudu
Expand Down

0 comments on commit 15bc887

Please sign in to comment.