From 34d5a998a02d274c645d29b350a6e51736a7c2f4 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Fri, 29 Jan 2016 10:44:41 -0800 Subject: [PATCH] Remove the AutoUpdate methods from EventAccumulator and EventMultiplexer. There's no reason for those classes to own the async behavior, a caller can easily manage themselves. The resulting code is easier to reason about because there is always one source of async and one thread rather than several different subtimers firing at different times. Also, it logs load time for performance measurement. Change: 113376718 --- .../python/summary/event_accumulator.py | 44 +------------ .../python/summary/event_multiplexer.py | 66 ++----------------- .../python/summary/event_multiplexer_test.py | 24 ------- tensorflow/tensorboard/backend/tensorboard.py | 27 ++++++-- 4 files changed, 29 insertions(+), 132 deletions(-) diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py index 95e8e179f47419..d6456f46a44ceb 100644 --- a/tensorflow/python/summary/event_accumulator.py +++ b/tensorflow/python/summary/event_accumulator.py @@ -99,22 +99,13 @@ class EventAccumulator(object): `Accumulator.Scalars(tag)`) allow for the retrieval of all data associated with that tag. - Before usage, the `EventAccumulator` must be activated via `Reload()` or - `AutoUpdate(interval)`. - - If activated via `Reload()`, it loads synchronously, so calls to `Values` or - `Tags` will block until all outstanding events are processed. Afterwards, - `Reload()` may be called again to load any new data. - - If activated via `AutoUpdate(interval)`, it loads asynchronously, so calls to - `Values` or `Tags` will immediately return a valid subset of the outstanding - event data. It reloads new data every `interval` seconds. + Before usage, the `EventAccumulator` must be activated via `Reload()`. This + method synchronosly loads all of the data written so far. Histograms and images are very large, so storing all of them is not recommended. @@Reload - @@AutoUpdate @@Tags @@Scalars @@Graph @@ -155,7 +146,6 @@ def __init__(self, path, size_guidance=DEFAULT_SIZE_GUIDANCE, self._images = reservoir.Reservoir(size=sizes[IMAGES]) self._generator_mutex = threading.Lock() self._generator = _GeneratorFromPath(path) - self._is_autoupdating = False self._activated = False self._compression_bps = compression_bps self.most_recent_step = -1 @@ -203,36 +193,6 @@ def Reload(self): value.image) return self - def AutoUpdate(self, interval=60): - """Asynchronously load all events, and periodically reload. - - Calling this function is not thread safe. - Calling this function activates the `EventAccumulator`. - - Args: - interval: how many seconds after each successful reload to load new events - (default 60) - - Returns: - The `EventAccumulator`. - """ - if self._is_autoupdating: - return - self._is_autoupdating = True - self._activated = True - def Update(): - self.Reload() - logging.info('EventAccumulator update triggered') - t = threading.Timer(interval, Update) - t.daemon = True - t.start() - # Asynchronously start the update process, so that the accumulator can - # immediately serve data, even if there is a very large event file to parse - t = threading.Timer(0, Update) - t.daemon = True - t.start() - return self - def Tags(self): """Return all tags found in the value stream. diff --git a/tensorflow/python/summary/event_multiplexer.py b/tensorflow/python/summary/event_multiplexer.py index 46ee752387609e..78bd359734a85d 100644 --- a/tensorflow/python/summary/event_multiplexer.py +++ b/tensorflow/python/summary/event_multiplexer.py @@ -68,7 +68,6 @@ class EventMultiplexer(object): @@AddRun @@AddRunsFromDirectory @@Reload - @@AutoUpdate @@Runs @@Scalars @@Graph @@ -93,8 +92,6 @@ def __init__(self, run_path_map=None, self._accumulators = {} self._paths = {} self._reload_called = False - self._autoupdate_called = False - self._autoupdate_interval = None self._size_guidance = size_guidance if run_path_map is not None: for (run, path) in six.iteritems(run_path_map): @@ -109,9 +106,9 @@ def AddRun(self, path, name=None): do nothing. If we are watching a different path, replace the event accumulator. - If `AutoUpdate` or `Reload` have been called, it will `AutoUpdate` or - `Reload` the newly created accumulators. This maintains the invariant that - once the Multiplexer was activated, all of its accumulators are active. + If `Reload` has been called, it will `Reload` the newly created + accumulators. This maintains the invariant that once the Multiplexer was + activated, all of its accumulators are active. Args: path: Path to the event files (or event directory) for given run. @@ -138,8 +135,6 @@ def AddRun(self, path, name=None): if accumulator: if self._reload_called: accumulator.Reload() - if self._autoupdate_called: - accumulator.AutoUpdate(self._autoupdate_interval) return self def AddRunsFromDirectory(self, path, name=None): @@ -153,9 +148,8 @@ def AddRunsFromDirectory(self, path, name=None): can call AddRunsFromDirectory at the root of a tree of event logs and TensorBoard will load them all. - If the `EventMultiplexer` is already loaded or autoupdating, this will cause - the newly created accumulators to also `Reload()` or `AutoUpdate()`. - + If the `EventMultiplexer` is already loaded this will cause + the newly created accumulators to `Reload()`. Args: path: A string path to a directory to load runs from. name: Optionally, what name to apply to the runs. If name is provided @@ -195,16 +189,6 @@ def Reload(self): l.Reload() return self - def AutoUpdate(self, interval=60): - """Call `AutoUpdate(interval)` on every `EventAccumulator`.""" - self._autoupdate_interval = interval - self._autoupdate_called = True - with self._accumulators_mutex: - loaders = list(self._accumulators.values()) - for l in loaders: - l.AutoUpdate(interval) - return self - def Scalars(self, run, tag): """Retrieve the scalar events associated with a run and tag. @@ -317,43 +301,3 @@ def Runs(self): def _GetAccumulator(self, run): with self._accumulators_mutex: return self._accumulators[run] - - -def AutoloadingMultiplexer(path_to_run, interval_secs=60, - size_guidance=event_accumulator.DEFAULT_SIZE_GUIDANCE): - """Create an `EventMultiplexer` that automatically loads runs in directories. - - Args: - path_to_run: Dict `{path: name}` which specifies the path to a directory, - and its name (or `None`). The path may contain tfevents files (in which - case they are loaded, with name as the name of the run) and subdirectories - containing tfevents files (in which case each subdirectory is added as a - run, named `'name/subdirectory'`). - - interval_secs: How often to poll the directory for new runs. - size_guidance: How much data to store for each tag of various types - see - `event_accumulator.EventAccumulator`. - - Returns: - The multiplexer which will automatically load from the directories. - - Raises: - ValueError: if `path_to_run` is `None` - TypeError: if `path_to_run` is not a dict - """ - multiplexer = EventMultiplexer(size_guidance=size_guidance) - if path_to_run is None: - raise ValueError('Cant construct an autoloading multiplexer without runs.') - if not isinstance(path_to_run, dict): - raise TypeError('path_to_run should be a dict, was %s', path_to_run) - def Load(): - for (path, name) in six.iteritems(path_to_run): - logging.info('Checking for new runs in %s', path) - multiplexer.AddRunsFromDirectory(path, name) - t = threading.Timer(interval_secs, Load) - t.daemon = True - t.start() - t = threading.Timer(0, Load) - t.daemon = True - t.start() - return multiplexer diff --git a/tensorflow/python/summary/event_multiplexer_test.py b/tensorflow/python/summary/event_multiplexer_test.py index e7cecba1447e68..4aab4991b331f7 100644 --- a/tensorflow/python/summary/event_multiplexer_test.py +++ b/tensorflow/python/summary/event_multiplexer_test.py @@ -47,8 +47,6 @@ class _FakeAccumulator(object): def __init__(self, path): self._path = path - self.autoupdate_called = False - self.autoupdate_interval = None self.reload_called = False def Tags(self): @@ -77,10 +75,6 @@ def Images(self, tag_name): raise KeyError return ['%s/%s' % (self._path, tag_name)] - def AutoUpdate(self, interval): - self.autoupdate_called = True - self.autoupdate_interval = interval - def Reload(self): self.reload_called = True @@ -113,14 +107,6 @@ def testReload(self): self.assertTrue(x._GetAccumulator('run1').reload_called) self.assertTrue(x._GetAccumulator('run2').reload_called) - def testAutoUpdate(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - x.AutoUpdate(5) - self.assertTrue(x._GetAccumulator('run1').autoupdate_called) - self.assertEqual(x._GetAccumulator('run1').autoupdate_interval, 5) - self.assertTrue(x._GetAccumulator('run2').autoupdate_called) - self.assertEqual(x._GetAccumulator('run2').autoupdate_interval, 5) - def testScalars(self): x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) @@ -275,15 +261,5 @@ def testAddRunMaintainsLoading(self): self.assertTrue(x._GetAccumulator('run1').reload_called) self.assertTrue(x._GetAccumulator('run2').reload_called) - def testAddRunMaintainsAutoUpdate(self): - x = event_multiplexer.EventMultiplexer() - x.AutoUpdate(5) - x.AddRun('run1') - x.AddRun('run2') - self.assertTrue(x._GetAccumulator('run1').autoupdate_called) - self.assertTrue(x._GetAccumulator('run2').autoupdate_called) - self.assertEqual(x._GetAccumulator('run1').autoupdate_interval, 5) - self.assertEqual(x._GetAccumulator('run2').autoupdate_interval, 5) - if __name__ == '__main__': googletest.main() diff --git a/tensorflow/tensorboard/backend/tensorboard.py b/tensorflow/tensorboard/backend/tensorboard.py index 30a31c6468d80c..347a4c6fcdf03b 100644 --- a/tensorflow/tensorboard/backend/tensorboard.py +++ b/tensorflow/tensorboard/backend/tensorboard.py @@ -25,9 +25,12 @@ import functools import os import socket +import threading +import time import tensorflow.python.platform +import six from six.moves import BaseHTTPServer from six.moves import socketserver @@ -72,6 +75,9 @@ event_accumulator.HISTOGRAMS: 1, } +# How often to reload new data after the latest load (secs) +LOAD_INTERVAL = 60 + def ParseEventFilesFlag(flag_value): """Parses the logdir flag into a map from paths to run group names. @@ -129,15 +135,26 @@ def main(unused_argv=None): 'details and examples.') return -1 - if FLAGS.debug: - logging.info('Starting TensorBoard in directory %s', os.getcwd()) + logging.info('Starting TensorBoard in directory %s', os.getcwd()) path_to_run = ParseEventFilesFlag(FLAGS.logdir) - multiplexer = event_multiplexer.AutoloadingMultiplexer( - path_to_run=path_to_run, interval_secs=60, + logging.info('TensorBoard path_to_run is: %s', path_to_run) + multiplexer = event_multiplexer.EventMultiplexer( size_guidance=TENSORBOARD_SIZE_GUIDANCE) - multiplexer.AutoUpdate(interval=30) + def _Load(): + start = time.time() + for (path, name) in six.iteritems(path_to_run): + multiplexer.AddRunsFromDirectory(path, name) + multiplexer.Reload() + duration = time.time() - start + logging.info('Multiplexer done loading. Load took %0.1f secs', duration) + t = threading.Timer(LOAD_INTERVAL, _Load) + t.daemon = True + t.start() + t = threading.Timer(0, _Load) + t.daemon = True + t.start() factory = functools.partial(tensorboard_handler.TensorboardHandler, multiplexer)