Skip to content

Commit

Permalink
Add the ability to return the cost model to the client as part of run…
Browse files Browse the repository at this point in the history
… metadata.

Change: 122696039
  • Loading branch information
yuanbyu authored and tensorflower-gardener committed May 19, 2016
1 parent c4119be commit be8a3e2
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 126 deletions.
40 changes: 8 additions & 32 deletions tensorflow/core/common_runtime/costmodel_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ limitations under the License.

namespace tensorflow {

namespace {

static const string kCostModelLogTag = "COST_MODEL";

} // namespace

CostModelManager::~CostModelManager() {
for (auto it : cost_models_) {
delete it.second;
Expand All @@ -36,7 +42,7 @@ CostModel* CostModelManager::FindOrCreateCostModel(const Graph* graph) {
return cost_model;
}

Status CostModelManager::BuildCostGraphDef(const Graph* graph,
Status CostModelManager::AddToCostGraphDef(const Graph* graph,
CostGraphDef* cost_graph) {
mutex_lock l(mu_);
// Get the cost model for the graph.
Expand All @@ -45,37 +51,7 @@ Status CostModelManager::BuildCostGraphDef(const Graph* graph,
return errors::InvalidArgument("The cost model graph doesn't exist.");
}
CostModel* cost_model = it->second;

// Construct the cost graph.
std::vector<const Edge*> inputs;
for (const Node* n : graph->nodes()) {
CostGraphDef::Node* cnode = cost_graph->add_node();
cnode->set_name(n->name());
cnode->set_id(cost_model->Id(n));

inputs.clear();
inputs.resize(n->num_inputs(), nullptr);
for (const Edge* e : n->in_edges()) {
inputs[e->dst_input()] = e;
}
for (const Edge* e : inputs) {
CostGraphDef::Node::InputInfo* input_info = cnode->add_input_info();
input_info->set_preceding_node(cost_model->Id(e->src()));
input_info->set_preceding_port(e->src_output());
}

for (int i = 0; i < n->num_outputs(); i++) {
CostGraphDef::Node::OutputInfo* output_info = cnode->add_output_info();
output_info->set_size(cost_model->MaxMemSize(n, i).value());
output_info->set_alias_input_port(cost_model->Aliases(n, i));
}

cnode->set_temporary_memory_size(cost_model->TempMemSize(n).value());

// For now we treat all send nodes as final.
// TODO(yuanbyu): Send nodes for fetches shouldn't be treated as final.
cnode->set_is_final(n->IsSend());
}
cost_model->AddToCostGraphDef(graph, cost_graph);
return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions tensorflow/core/common_runtime/costmodel_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ class CostModelManager {
typedef std::unordered_map<const Graph*, CostModel*> CostModelMap;
typedef CostModelMap::iterator CostModelMapIter;

gtl::iterator_range<CostModelMapIter> CostModels() {
void ExportCostModels(CostModelMap* cost_models) {
mutex_lock l(mu_);
return gtl::make_range(cost_models_.begin(), cost_models_.end());
*cost_models = cost_models_;
}

CostModel* FindOrCreateCostModel(const Graph* graph);
Status BuildCostGraphDef(const Graph* graph, CostGraphDef* cost_graph);

Status AddToCostGraphDef(const Graph* graph, CostGraphDef* cost_graph);

private:
mutex mu_;
Expand Down
45 changes: 30 additions & 15 deletions tensorflow/core/common_runtime/direct_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,15 @@ Status DirectSession::Run(const RunOptions& run_options,
}

std::unique_ptr<GPUTracer> tracer;
if (run_options.trace_level() == RunOptions::FULL_TRACE ||
options_.config.graph_options().build_cost_model()) {
const bool full_trace = (run_options.trace_level() == RunOptions::FULL_TRACE);
const int64 build_cost_model =
options_.config.graph_options().build_cost_model();
if (full_trace || build_cost_model > 0) {
args.stats_collector = new StepStatsCollector(
run_metadata->mutable_step_stats(), &cost_model_manager_);
run_metadata->mutable_step_stats(),
(build_cost_model > 0) ? &cost_model_manager_ : nullptr);
run_state.collector = args.stats_collector;
if (tracer && run_options.trace_level() == RunOptions::FULL_TRACE) {
if (tracer && full_trace) {
tracer.reset(CreateGPUTracer());
tracer->Start();
}
Expand All @@ -344,6 +347,7 @@ Status DirectSession::Run(const RunOptions& run_options,
WaitForNotification(&run_state, run_options.timeout_in_ms() > 0
? run_options.timeout_in_ms()
: operation_timeout_in_ms_);

if (tracer) {
tracer->Stop();
tracer->Collect(args.stats_collector);
Expand All @@ -362,6 +366,16 @@ Status DirectSession::Run(const RunOptions& run_options,
TF_RETURN_IF_ERROR(
run_state.tensor_store.SaveTensors(output_names, &session_state_));

// Build and return the cost model as instructed.
mutex_lock l(executor_lock_);
++executors_and_keys->step_count;
if (executors_and_keys->step_count == build_cost_model) {
CostGraphDef* cost_graph = run_metadata->mutable_cost_graph();
for (const auto& item : executors_and_keys->items) {
TF_RETURN_IF_ERROR(
cost_model_manager_.AddToCostGraphDef(item.graph, cost_graph));
}
}
return Status::OK();
}

Expand Down Expand Up @@ -400,17 +414,14 @@ Status DirectSession::PRunSetup(const std::vector<string>& input_names,
}

// Start parallel Executors.
Notification& executors_done = run_state->executors_done;
Status* run_status = &run_state->status;
const int num_executors = executors_and_keys->items.size();
ExecutorBarrier* barrier = new ExecutorBarrier(
num_executors, run_state->rendez,
[&executors_done, run_status, this](const Status& ret) {
num_executors, run_state->rendez, [run_state](const Status& ret) {
if (!ret.ok()) {
mutex_lock l(executor_lock_);
*run_status = ret;
mutex_lock l(run_state->mu_);
run_state->status.Update(ret);
}
executors_done.Notify();
run_state->executors_done.Notify();
});

Executor::Args args;
Expand All @@ -427,14 +438,13 @@ Status DirectSession::PRunSetup(const std::vector<string>& input_names,
}

if (options_.config.graph_options().build_cost_model()) {
run_state->collector =
args.stats_collector =
new StepStatsCollector(nullptr, &cost_model_manager_);
args.stats_collector = run_state->collector;
run_state->collector = args.stats_collector;
}

for (auto& item : executors_and_keys->items) {
Executor* exec = item.executor;
exec->RunAsync(args, barrier->Get());
item.executor->RunAsync(args, barrier->Get());
}

*handle = run_state_args.handle;
Expand Down Expand Up @@ -780,6 +790,7 @@ Status DirectSession::GetOrCreateExecutors(
}
// NewLocalExecutor takes ownership of *partition_graph.
iter->second = nullptr;
item->graph = partition_graph;
item->executor = nullptr;
s = NewLocalExecutor(params, partition_graph, &item->executor);
if (!s.ok()) {
Expand Down Expand Up @@ -1012,6 +1023,10 @@ class DirectSessionFactory : public SessionFactory {
}

Session* NewSession(const SessionOptions& options) override {
// Must do this before the CPU allocator is created.
if (options.config.graph_options().build_cost_model() > 0) {
EnableCPUAllocatorFullStats(true);
}
std::vector<Device*> devices;
DeviceFactory::AddDevices(options, "/job:localhost/replica:0/task:0",
&devices);
Expand Down
22 changes: 12 additions & 10 deletions tensorflow/core/common_runtime/direct_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ class DirectSession : public Session {

::tensorflow::Status Close() override;

// This is mainly for testing/debugging.
gtl::iterator_range<CostModelManager::CostModelMapIter> CostModels() {
return cost_model_manager_.CostModels();
void ExportCostModels(CostModelManager::CostModelMap* cost_models) {
cost_model_manager_.ExportCostModels(cost_models);
}

private:
Expand All @@ -94,19 +93,22 @@ class DirectSession : public Session {
// We create one executor and its dependent library runtime for
// every partition.
struct PerPartitionExecutorsAndLib {
Graph* graph = nullptr;
Executor* executor = nullptr;
FunctionLibraryRuntime* flib = nullptr;
};

// An ExecutorsAndKeys is created for a given set of feeds/fetches.
// 'func_defs' are the function definition used by all the
// underlying executors. 'graph' is the entire graph being
// executed. 'name_to_node' maps node name to node. We keep 'graph'
// and 'name_to_node' only in the case of partial runs. Each item in
// 'items' is the executor for a partition of the graph bundled with
// its dependent library runtime. 'input_keys' are the rendezvous keys
// for the feeds and 'output_keys' are rendezvous keys for the fetches.
// 'step_count' is the number of times this graph is executed.
// 'func_defs' are the function definition used by all the underlying
// executors. 'graph' is the entire graph being executed. 'name_to_node'
// maps node name to node. We keep 'graph' and 'name_to_node' only in
// the case of partial runs. Each item in 'items' is the executor for
// a partition of the graph bundled with its dependent library runtime.
// 'input_keys' are the rendezvous keys for the feeds and 'output_keys'
// are rendezvous keys for the fetches.
struct ExecutorsAndKeys {
int64 step_count = 0;
FunctionLibraryDefinition* func_defs = nullptr;
Graph* graph = nullptr;
NameNodeMap* name_to_node = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace tensorflow {
namespace {

TEST(DirectSessionWithTrackingAllocTest, CostModelTest) {
EnableCPUAllocatorDetailedStats(true);
EnableCPUAllocatorFullStats(true);

Graph graph(OpRegistry::Global());

Expand Down Expand Up @@ -84,16 +84,18 @@ TEST(DirectSessionWithTrackingAllocTest, CostModelTest) {

DirectSession* ds = static_cast<DirectSession*>(session.get());
int graph_cnt = 0;
for (auto& it : ds->CostModels()) {
CostModelManager::CostModelMap cost_models;
ds->ExportCostModels(&cost_models);
for (auto& it : cost_models) {
const Graph* g = (it).first;
const CostModel* cm = (it).second;
for (Node* node : g->nodes()) {
if (node->name() == y->name()) {
EXPECT_LE(8, cm->MaxMemSize(node, 0));
EXPECT_EQ(5, cm->Aliases(node, 0));
EXPECT_LE(8, cm->MaxMemorySize(node, 0));
EXPECT_EQ(5, cm->AllocationId(node, 0));
} else if (node->name() == y_neg->name()) {
EXPECT_LE(8, cm->MaxMemSize(node, 0));
EXPECT_EQ(6, cm->Aliases(node, 0));
EXPECT_LE(8, cm->MaxMemorySize(node, 0));
EXPECT_EQ(6, cm->AllocationId(node, 0));
}
// Check the execution time. Since it's highly variable, we'll
// use a large window: anything between 1 and 10000 microseconds is
Expand Down
3 changes: 2 additions & 1 deletion tensorflow/core/common_runtime/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
#include <unordered_map>
#include <vector>

#include "tensorflow/core/common_runtime/costmodel_manager.h"
#include "tensorflow/core/common_runtime/pending_counts.h"
#include "tensorflow/core/common_runtime/step_stats_collector.h"
#include "tensorflow/core/framework/allocation_description.pb.h"
Expand Down Expand Up @@ -1475,7 +1476,7 @@ bool ExecutorState::NodeDone(const Status& s, const Node* node,
std::deque<TaggedNode>* inline_ready) {
if (stats_collector_) {
nodestats::SetAllEnd(stats);
stats_collector_->UpdateCostModel(stats, impl_->graph_, node);
stats_collector_->UpdateCostModelNode(stats, impl_->graph_, node);
if (!SetTimelineLabel(node, stats)) {
// Only record non-transfer nodes.
stats_collector_->Save(impl_->params_.device->name(), stats);
Expand Down
32 changes: 16 additions & 16 deletions tensorflow/core/common_runtime/step_stats_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ StepStatsCollector::StepStatsCollector(StepStats* ss,
CostModelManager* cost_model_manager)
: step_stats_(ss), cost_model_manager_(cost_model_manager) {}

void StepStatsCollector::UpdateCostModel(const NodeExecStats* nt,
const Graph* graph, const Node* node) {
void StepStatsCollector::UpdateCostModelNode(const NodeExecStats* nt,
const Graph* graph,
const Node* node) {
mutex_lock l(mu_);
if (cost_model_manager_ == nullptr) {
return;
}
CostModel* cm = cost_model_manager_->FindOrCreateCostModel(graph);
cm->RecordMaxExecutionTime(node, Microseconds(nt->op_end_rel_micros()));
if (cost_model_manager_ != nullptr) {
CostModel* cm = cost_model_manager_->FindOrCreateCostModel(graph);
cm->RecordMaxExecutionTime(node, Microseconds(nt->op_end_rel_micros()));

for (int i = 0; i < nt->output_size(); ++i) {
cm->RecordMaxMemSize(node, i, Bytes(nt->output(i)
.tensor_description()
.allocation_description()
.allocated_bytes()));
cm->RecordAliases(node, i, nt->output(i)
.tensor_description()
.allocation_description()
.allocation_id());
for (int i = 0; i < nt->output_size(); ++i) {
cm->RecordMaxMemorySize(node, i, Bytes(nt->output(i)
.tensor_description()
.allocation_description()
.allocated_bytes()));
cm->RecordAllocationId(node, i, nt->output(i)
.tensor_description()
.allocation_description()
.allocation_id());
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions tensorflow/core/common_runtime/step_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class StepStatsCollector {
explicit StepStatsCollector(StepStats* ss,
CostModelManager* cost_model_manager = nullptr);

void UpdateCostModel(const NodeExecStats* nt, const Graph* graph,
const Node* node);
void UpdateCostModelNode(const NodeExecStats* nt, const Graph* graph,
const Node* node);

void Save(const string& device, NodeExecStats* nt);

void Swap(StepStats* ss);

private:
friend class StepStatsMgr;
mutex mu_;
StepStats* step_stats_ GUARDED_BY(mu_);
CostModelManager* cost_model_manager_ GUARDED_BY(mu_);
Expand Down
10 changes: 5 additions & 5 deletions tensorflow/core/framework/allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ Allocator::~Allocator() {}

// If true, cpu allocator collects more stats.
static bool cpu_allocator_collect_stats = false;
// If true, cpu allocator collects detailed stats.
static bool cpu_allocator_collect_detailed_stats = false;
// If true, cpu allocator collects full stats.
static bool cpu_allocator_collect_full_stats = false;

void EnableCPUAllocatorStats(bool enable) {
cpu_allocator_collect_stats = enable;
}
void EnableCPUAllocatorDetailedStats(bool enable) {
cpu_allocator_collect_detailed_stats = enable;
void EnableCPUAllocatorFullStats(bool enable) {
cpu_allocator_collect_full_stats = enable;
}

class CPUAllocator : public Allocator {
Expand Down Expand Up @@ -111,7 +111,7 @@ class CPUAllocator : public Allocator {
namespace {
Allocator* MakeCpuAllocator() {
Allocator* allocator = new CPUAllocator;
if (cpu_allocator_collect_detailed_stats || LogMemory::IsEnabled()) {
if (cpu_allocator_collect_full_stats || LogMemory::IsEnabled()) {
allocator = new TrackingAllocator(allocator, true);
}
return allocator;
Expand Down
7 changes: 3 additions & 4 deletions tensorflow/core/framework/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,9 @@ Allocator* cpu_allocator();
// AllocatorStats. By default, it's disabled.
void EnableCPUAllocatorStats(bool enable);

// If 'enable' is true, the process-wide cpu allocator collects
// detailed statistics. This can be slow, so this is disabled by
// default.
void EnableCPUAllocatorDetailedStats(bool enable);
// If 'enable' is true, the process-wide cpu allocator collects full
// statistics. By default, it's disabled.
void EnableCPUAllocatorFullStats(bool enable);

// Abstract interface of an object that does the underlying suballoc/free of
// memory for a higher-level allocator.
Expand Down
Loading

0 comments on commit be8a3e2

Please sign in to comment.