Skip to content

Commit

Permalink
Pin ops with small integer inputs (already on the cpu) to the cpu in …
Browse files Browse the repository at this point in the history
…eager.

An environment variable (TF_EAGER_ENABLE_SMALL_TENSOR_CPU_PINNING) is provided to turn this off if necessary (its on by default).

PiperOrigin-RevId: 215821915
  • Loading branch information
Akshay Modi authored and tensorflower-gardener committed Oct 4, 2018
1 parent d6a2e7b commit cf8e7cf
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 10 deletions.
4 changes: 3 additions & 1 deletion tensorflow/core/common_runtime/eager/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ EagerContext::EagerContext(const SessionOptions& opts,
async_default_(async),
log_memory_(LogMemory::IsEnabled()),
env_(opts.env),
use_send_tensor_rpc_(false) {
use_send_tensor_rpc_(false),
pin_small_ops_to_cpu_(ReadBoolFromEnvVar(
"TF_EAGER_ENABLE_SMALL_TENSOR_CPU_PINNING", true)) {
if (device_mgr_owned) {
local_device_manager_.reset(device_mgr);
local_unowned_device_manager_ = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions tensorflow/core/common_runtime/eager/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class EagerContext {
// EagerService.SendTensor RPC. If false, _Send/_Recv ops should be used
// instead (which in-turn use WorkerService.RecvTensor RPCs).
bool UseSendTensorRPC() { return use_send_tensor_rpc_; }
bool PinSmallOpsToCPU() { return pin_small_ops_to_cpu_; }

private:
void InitDeviceMapAndAsync();
Expand Down Expand Up @@ -293,6 +294,7 @@ class EagerContext {
#endif

bool use_send_tensor_rpc_;
const bool pin_small_ops_to_cpu_;
};

} // namespace tensorflow
Expand Down
67 changes: 58 additions & 9 deletions tensorflow/core/common_runtime/eager/execute.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,19 +579,23 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
return Status::OK();
#endif
}
} // namespace

Status EagerExecute(EagerOperation* op,
gtl::InlinedVector<TensorHandle*, 2>* retvals,
int* num_retvals) {
// Ensure all resource-touching ops run in the device the resource is,
// regardless of anything else that has been specified. This is identical to
// the graph mode behavior.
// The Op device may be updated if:
// - A resource touching input is specified: all resource-touching ops run in
// the device the resource is, regardless of anything else that has been
// specified. This is identical to the graph mode behavior.
//
// - All op inputs are on the CPU, small (<64 elements) and integers
// (int32/int64). This can be disabled by setting the environment variable
// "TF_EAGER_ENABLE_SMALL_TENSOR_CPU_PINNING" to "0" or "false".
Status MaybeUpdateOpDevice(EagerOperation* op) {
EagerContext* ctx = op->EagerContext();
bool device_set_for_resource_variable = false;
bool all_inputs_eligible_for_cpu_pinning = ctx->PinSmallOpsToCPU();

for (int i = 0; i < op->Inputs().size(); ++i) {
Device* input_op_device = nullptr;
auto status = op->Inputs()[i]->OpDevice(&input_op_device);
if (!status.ok()) return status;
TF_RETURN_IF_ERROR(op->Inputs()[i]->OpDevice(&input_op_device));
VLOG(2) << "for op " << op->Name() << " input " << i << " "
<< DataTypeString(op->Inputs()[i]->dtype) << " "
<< (input_op_device == nullptr ? "cpu" : input_op_device->name())
Expand All @@ -603,8 +607,53 @@ Status EagerExecute(EagerOperation* op,
<< d->name() << " because input #" << i
<< " is a resource in this device.";
op->SetDevice(d);

device_set_for_resource_variable = true;
all_inputs_eligible_for_cpu_pinning = false;
} else if (all_inputs_eligible_for_cpu_pinning) {
TensorHandle* handle = op->Inputs()[i];

// Input is on CPU.
if (input_op_device != nullptr && input_op_device != ctx->HostCPU()) {
all_inputs_eligible_for_cpu_pinning = false;
continue;
}

if (handle->dtype != DataType::DT_INT32 &&
handle->dtype != DataType::DT_INT64) {
all_inputs_eligible_for_cpu_pinning = false;
continue;
}

int64 num_elements;
TF_RETURN_IF_ERROR(handle->NumElements(&num_elements));
if (num_elements > 64) {
all_inputs_eligible_for_cpu_pinning = false;
}
}
}

// Ops without inputs are usually ops that generate a tensor in some way and
// usually require being present on whatever device they are scheduled on
// - for e.g. VarHandleOp or _Recv).
// TODO(nareshmodi): Is it possible there is no int32/int64 CPU kernel for
// an op, but there is a GPU kernel?
if (!op->Inputs().empty() && all_inputs_eligible_for_cpu_pinning) {
VLOG(1) << "Forcing op " << op->Name()
<< " to be on the CPU since all input tensors have an "
"int32/int64 dtype, and are small (less than 64 elements).";
op->SetDevice(ctx->HostCPU());
}

return Status::OK();
}
} // namespace

Status EagerExecute(EagerOperation* op,
gtl::InlinedVector<TensorHandle*, 2>* retvals,
int* num_retvals) {
TF_RETURN_IF_ERROR(MaybeUpdateOpDevice(op));

bool op_is_local = IsLocal(op->EagerContext(), op->Device());

if (op_is_local) {
Expand Down
28 changes: 28 additions & 0 deletions tensorflow/python/eager/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,34 @@ def testConvertMixedEagerTensorsWithVariables(self):
for t in tensors:
self.assertIsInstance(t, ops.EagerTensor)

def testSmallIntegerOpsForcedToCPU(self):
if not context.context().num_gpus():
self.skipTest('No GPUs found')

a = constant_op.constant((1, 2, 3, 4, 5), dtype=dtypes.int64)
b = constant_op.constant((2, 3, 4, 5, 6), dtype=dtypes.int64)
with context.device('gpu:0'):
c = a + b

# Op forced to CPU since all constants are integers and small.
self.assertEqual(c.device, '/job:localhost/replica:0/task:0/device:CPU:0')

a = array_ops.zeros((8, 10), dtype=dtypes.int64)
b = array_ops.ones((8, 10), dtype=dtypes.int64)

with context.device('gpu:0'):
c = a + b

# Op not forced to CPU since the tensors are larger than 64 elements.
self.assertEqual(c.device, '/job:localhost/replica:0/task:0/device:GPU:0')

a = constant_op.constant((1, 2, 3, 4, 5), dtype=dtypes.float32)
b = constant_op.constant((2, 3, 4, 5, 6), dtype=dtypes.float32)
with context.device('gpu:0'):
c = a + b

# Op not forced to CPU since the constants are not integers.
self.assertEqual(c.device, '/job:localhost/replica:0/task:0/device:GPU:0')

class SendRecvTest(test_util.TensorFlowTestCase):

Expand Down

0 comments on commit cf8e7cf

Please sign in to comment.