Skip to content

Commit

Permalink
Memory tracking for result tracker
Browse files Browse the repository at this point in the history
This adds memory tracking to ResultTracker, making sure we account for
the memory as we cache responses for clients' requests.

Testing wise this adds memory consumption checks to rpc-stress-test.cc.

Original patch by David Alves.

Change-Id: I3b81dda41c8bc7f70380ce426142c34afe6f1625
Reviewed-on: http://gerrit.cloudera.org:8080/3627
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
toddlipcon committed Aug 15, 2016
1 parent 74210b2 commit 0fb4409
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 17 deletions.
66 changes: 62 additions & 4 deletions src/kudu/rpc/result_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,67 @@
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/trace.h"
#include "kudu/util/pb_util.h"

namespace kudu {
namespace rpc {

using google::protobuf::Message;
using kudu::MemTracker;
using rpc::InboundCall;
using std::move;
using std::lock_guard;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using strings::Substitute;
using strings::SubstituteAndAppend;

// This tracks the size changes of anything that has a memory_footprint() method.
// It must be instantiated before the updates, and it makes sure that the MemTracker
// is updated on scope exit.
template <class T>
struct ScopedMemTrackerUpdater {
ScopedMemTrackerUpdater(MemTracker* tracker_, const T* tracked_)
: tracker(tracker_),
tracked(tracked_),
memory_before(tracked->memory_footprint()),
cancelled(false) {
}

~ScopedMemTrackerUpdater() {
if (cancelled) return;
tracker->Release(memory_before - tracked->memory_footprint());
}

void Cancel() {
cancelled = true;
}

MemTracker* tracker;
const T* tracked;
int64_t memory_before;
bool cancelled;
};

ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
: mem_tracker_(std::move(mem_tracker)),
clients_(ClientStateMap::key_compare(),
ClientStateMapAllocator(mem_tracker_)) {}

ResultTracker::~ResultTracker() {
lock_guard<simple_spinlock> l(lock_);
// Release all the memory for the stuff we'll delete on destruction.
for (auto& client_state : clients_) {
for (auto& completion_record : client_state.second->completion_records) {
mem_tracker_->Release(completion_record.second->memory_footprint());
}
mem_tracker_->Release(client_state.second->memory_footprint());
}
}

ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
Message* response,
RpcContext* context) {
Expand All @@ -47,20 +93,28 @@ ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id,
Message* response,
RpcContext* context) {

ClientState* client_state = ComputeIfAbsent(
&clients_,
request_id.client_id(),
[]{ return unique_ptr<ClientState>(new ClientState()); })->get();
[&]{
unique_ptr<ClientState> client_state(new ClientState(mem_tracker_));
mem_tracker_->Consume(client_state->memory_footprint());
return client_state;
})->get();

client_state->last_heard_from = MonoTime::Now();

auto result = ComputeIfAbsentReturnAbsense(
&client_state->completion_records,
request_id.seq_no(),
[]{ return unique_ptr<CompletionRecord>(new CompletionRecord()); });
[&]{
unique_ptr<CompletionRecord> completion_record(new CompletionRecord());
mem_tracker_->Consume(completion_record->memory_footprint());
return completion_record;
});

CompletionRecord* completion_record = result.first->get();
ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);

if (PREDICT_TRUE(result.second)) {
completion_record->state = RpcState::IN_PROGRESS;
Expand Down Expand Up @@ -111,6 +165,7 @@ ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB&
if (state != RpcState::IN_PROGRESS) return state;

CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);

// ... if we did find a CompletionRecord change the driver and return true.
completion_record->driver_attempt_no = request_id.attempt_no();
Expand Down Expand Up @@ -194,6 +249,7 @@ void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
lock_guard<simple_spinlock> l(lock_);

CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);

CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
<< "Called RecordCompletionAndRespond() from an executor identified with an attempt number that"
Expand Down Expand Up @@ -233,13 +289,13 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
HandleOngoingRpcFunc func) {
lock_guard<simple_spinlock> l(lock_);
auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);

if (PREDICT_FALSE(state_and_record.first == nullptr)) {
LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
<< ". \nTracker state:\n" << ToStringUnlocked();
}

CompletionRecord* completion_record = state_and_record.second;
ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);

if (completion_record == nullptr) {
return;
Expand Down Expand Up @@ -272,8 +328,10 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
// delete the completion record.
if (completion_record->ongoing_rpcs.size() == 0
&& completion_record->state != RpcState::COMPLETED) {
cr_updater.Cancel();
unique_ptr<CompletionRecord> completion_record =
EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
mem_tracker_->Release(completion_record->memory_footprint());
}
}

Expand Down
49 changes: 44 additions & 5 deletions src/kudu/rpc/result_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
#pragma once

#include <functional>
#include <map>
#include <string>
#include <utility>
Expand All @@ -26,6 +27,8 @@
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/monotime.h"

namespace google {
Expand Down Expand Up @@ -139,7 +142,6 @@ class RpcContext;
//
// This class is thread safe.
//
// TODO Memory bookkeeping.
// TODO Garbage collection.
class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
public:
Expand All @@ -160,8 +162,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
STALE
};

ResultTracker() {}
~ResultTracker() {}
explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker);
~ResultTracker();

// Tracks the RPC and returns its current state.
//
Expand Down Expand Up @@ -244,12 +246,39 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
std::vector<OnGoingRpcInfo> ongoing_rpcs;

std::string ToString() const;

// Calculates the memory footprint of this struct.
int64_t memory_footprint() const {
return kudu_malloc_usable_size(this)
+ (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data()) : 0)
+ (response.get() != nullptr ? response->SpaceUsed() : 0);
}
};
// The state corresponding to a single client.
struct ClientState {
typedef MemTrackerAllocator<
std::pair<const SequenceNumber,
std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator;
typedef std::map<SequenceNumber,
std::unique_ptr<CompletionRecord>,
std::less<SequenceNumber>,
CompletionRecordMapAllocator> CompletionRecordMap;

explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
: completion_records(CompletionRecordMap::key_compare(),
CompletionRecordMapAllocator(std::move(mem_tracker))) {}

MonoTime last_heard_from;
std::map<SequenceNumber, std::unique_ptr<CompletionRecord>> completion_records;
CompletionRecordMap completion_records;

std::string ToString() const;

// Calculates the memory footprint of this struct.
// This calculation is shallow and doesn't account for the memory the nested data
// structures occupy.
int64_t memory_footprint() const {
return kudu_malloc_usable_size(this);
}
};

RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
Expand Down Expand Up @@ -290,12 +319,22 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {

std::string ToStringUnlocked() const;

// The memory tracker that tracks this ResultTracker's memory consumption.
std::shared_ptr<kudu::MemTracker> mem_tracker_;

// Lock that protects access to 'clients_' and to the state contained in each
// ClientState.
// TODO consider a per-ClientState lock if we find this too coarse grained.
simple_spinlock lock_;
std::map<std::string, std::unique_ptr<ClientState>> clients_;

typedef MemTrackerAllocator<std::pair<const std::string,
std::unique_ptr<ClientState>>> ClientStateMapAllocator;
typedef std::map<std::string,
std::unique_ptr<ClientState>,
std::less<std::string>,
ClientStateMapAllocator> ClientStateMap;

ClientStateMap clients_;

DISALLOW_COPY_AND_ASSIGN(ResultTracker);
};
Expand Down
73 changes: 67 additions & 6 deletions src/kudu/rpc/rpc-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ namespace {
const char* kClientId = "test-client";

void AddRequestId(RpcController* controller,
const std::string& client_id,
ResultTracker::SequenceNumber sequence_number,
int64_t attempt_no) {
unique_ptr<RequestIdPB> request_id(new RequestIdPB());
request_id->set_client_id(kClientId);
request_id->set_client_id(client_id);
request_id->set_seq_no(sequence_number);
request_id->set_attempt_no(attempt_no);
request_id->set_first_incomplete_seq_no(sequence_number);
Expand Down Expand Up @@ -191,7 +192,7 @@ class RpcStressTest : public RpcTestBase {
client_sleep_for_ms(client_sleep) {
req.set_value_to_add(value);
req.set_sleep_for_ms(server_sleep);
AddRequestId(&controller, sequence_number, attempt_no);
AddRequestId(&controller, kClientId, sequence_number, attempt_no);
}

void Start() {
Expand Down Expand Up @@ -224,7 +225,7 @@ class RpcStressTest : public RpcTestBase {
ExactlyOnceResponsePB resp;
RequestTracker::SequenceNumber seq_no;
CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
AddRequestId(&controller, seq_no, 0);
AddRequestId(&controller, kClientId, seq_no, 0);
ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
ASSERT_EQ(resp.current_val(), expected_value);
request_tracker_->RpcCompleted(seq_no);
Expand All @@ -244,16 +245,30 @@ class RpcStressTest : public RpcTestBase {
// same sequence number as previous request.
TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
ExactlyOnceResponsePB original_resp;
int mem_consumption = mem_tracker_->consumption();
{
RpcController controller;
ExactlyOnceRequestPB req;
req.set_value_to_add(1);

// Assign id 0.
AddRequestId(&controller, 0, 0);
AddRequestId(&controller, kClientId, 0, 0);

// Send the request the first time.
ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));

// The incremental usage of a new client is the size of the response itself
// plus some fixed overhead for the client-tracking structure.
int expected_incremental_usage = original_resp.SpaceUsed() + 200;

// The consumption isn't immediately updated, since the MemTracker update
// happens after we call 'Respond' on the RPC.
int mem_consumption_after;
AssertEventually([&]() {
mem_consumption_after = mem_tracker_->consumption();
ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
});
mem_consumption = mem_consumption_after;
}

// Now repeat the rpc 10 times, using the same sequence number, none of these should be executed
Expand All @@ -264,10 +279,32 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
ExactlyOnceRequestPB req;
req.set_value_to_add(1);
ExactlyOnceResponsePB resp;
AddRequestId(&controller, 0, i + 1);
AddRequestId(&controller, kClientId, 0, i + 1);
ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
ASSERT_EQ(resp.current_val(), 1);
ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
// Sleep to give the MemTracker time to update -- we don't expect any update,
// but if we had a bug here, we'd only see it with this sleep.
SleepFor(MonoDelta::FromMilliseconds(100));
// We shouldn't have consumed any more memory since the responses were cached.
ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
}

// Making a new request, from a new client, should double the memory consumption.
{
RpcController controller;
ExactlyOnceRequestPB req;
ExactlyOnceResponsePB resp;
req.set_value_to_add(1);

// Assign id 0.
AddRequestId(&controller, "test-client2", 0, 0);

// Send the first request for this new client.
ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
AssertEventually([&]() {
ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
});
}
}

Expand Down Expand Up @@ -313,7 +350,22 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
kNumThreads = 100;
}

for (int i = 0; i < kNumIterations; i ++) {
ResultTracker::SequenceNumber sequence_number = 0;
int memory_consumption_initial = mem_tracker_->consumption();
int single_response_size = 0;

// Measure memory consumption for a single response from the same client.
{
RpcController controller;
ExactlyOnceRequestPB req;
ExactlyOnceResponsePB resp;
req.set_value_to_add(1);
AddRequestId(&controller, kClientId, sequence_number, 0);
ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
single_response_size = resp.SpaceUsed();
}

for (int i = 1; i <= kNumIterations; i ++) {
vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
for (int j = 0; j < kNumThreads; j++) {
unique_ptr<SimultaneousExactlyOnceAdder> adder(
Expand All @@ -334,6 +386,15 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
}
}

// Wait for the MemTracker to be updated.
// After all adders finished we should at least the size of one more response.
// The actual size depends of multiple factors, for instance, how many calls were "attached"
// (which is timing dependent) so we can't be more precise than this.
AssertEventually([&]() {
ASSERT_GT(mem_tracker_->consumption(),
memory_consumption_initial + single_response_size * i);
});
}
}

Expand Down
Loading

0 comments on commit 0fb4409

Please sign in to comment.