Skip to content

Commit

Permalink
Re-enable operation timeouts when using the in-process master.
Browse files Browse the repository at this point in the history
The previous version was ignoring the timeout (if set) in the
`CallOptions*` passed to the server method, and blocking indefinitely
on the completion notification, instead of cancelling the call if the
notification was not set before the timeout.

Fixes tensorflow#6742.
Change: 143042703
  • Loading branch information
mrry authored and tensorflower-gardener committed Dec 27, 2016
1 parent 0737783 commit e136215
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
34 changes: 27 additions & 7 deletions tensorflow/core/distributed_runtime/local_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ limitations under the License.

namespace tensorflow {

namespace {

Status WaitForNotification(CallOptions* call_options, Notification* n) {
int64 timeout_in_ms = call_options->GetTimeout();
if (timeout_in_ms > 0) {
bool notified = WaitForNotificationWithTimeout(n, timeout_in_ms);
if (!notified) {
call_options->StartCancel();
// The call has borrowed pointers to the request and response
// messages, so we must still wait for the call to complete.
n->WaitForNotification();
return errors::DeadlineExceeded("Operation timed out.");
}
} else {
n->WaitForNotification();
}
return Status::OK();
}
}

LocalMaster::LocalMaster(Master* master_impl) : master_impl_(master_impl) {}

Status LocalMaster::CreateSession(CallOptions* call_options,
Expand All @@ -33,7 +53,7 @@ Status LocalMaster::CreateSession(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -46,7 +66,7 @@ Status LocalMaster::ExtendSession(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -59,7 +79,7 @@ Status LocalMaster::PartialRunSetup(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -73,7 +93,7 @@ Status LocalMaster::RunStep(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -86,7 +106,7 @@ Status LocalMaster::CloseSession(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -99,7 +119,7 @@ Status LocalMaster::ListDevices(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand All @@ -112,7 +132,7 @@ Status LocalMaster::Reset(CallOptions* call_options,
ret.Update(s);
n.Notify();
});
n.WaitForNotification();
TF_RETURN_IF_ERROR(WaitForNotification(call_options, &n));
return ret;
}

Expand Down
13 changes: 13 additions & 0 deletions tensorflow/python/training/server_lib_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,19 @@ def testSparseJob(self):
with session.Session(server.target) as sess:
self.assertEqual(1.0, sess.run(a))

def testTimeoutRaisesException(self):
server = server_lib.Server.create_local_server()
q = data_flow_ops.FIFOQueue(1, [dtypes.float32])
blocking_t = q.dequeue()

with session.Session(server.target) as sess:
with self.assertRaises(errors_impl.DeadlineExceededError):
sess.run(blocking_t, options=config_pb2.RunOptions(timeout_in_ms=1000))

with session.Session(server.target, config=self._useRPCConfig()) as sess:
with self.assertRaises(errors_impl.DeadlineExceededError):
sess.run(blocking_t, options=config_pb2.RunOptions(timeout_in_ms=1000))


class ServerDefTest(test.TestCase):

Expand Down

0 comments on commit e136215

Please sign in to comment.