Skip to content

Commit

Permalink
Remove the AutoUpdate methods from EventAccumulator and EventMultiple…
Browse files Browse the repository at this point in the history
…xer.

There's no reason for those classes to own the async behavior, a caller can easily manage themselves.

The resulting code is easier to reason about because there is always one source of async and one thread rather than several different subtimers firing at different times. Also, it logs load time for performance measurement.
Change: 113376718
  • Loading branch information
A. Unique TensorFlower authored and Vijay Vasudevan committed Jan 30, 2016
1 parent 6d92548 commit 34d5a99
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 132 deletions.
44 changes: 2 additions & 42 deletions tensorflow/python/summary/event_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,13 @@ class EventAccumulator(object):
`Accumulator.Scalars(tag)`) allow for the retrieval of all data
associated with that tag.
Before usage, the `EventAccumulator` must be activated via `Reload()` or
`AutoUpdate(interval)`.
If activated via `Reload()`, it loads synchronously, so calls to `Values` or
`Tags` will block until all outstanding events are processed. Afterwards,
`Reload()` may be called again to load any new data.
If activated via `AutoUpdate(interval)`, it loads asynchronously, so calls to
`Values` or `Tags` will immediately return a valid subset of the outstanding
event data. It reloads new data every `interval` seconds.
Before usage, the `EventAccumulator` must be activated via `Reload()`. This
method synchronosly loads all of the data written so far.
Histograms and images are very large, so storing all of them is not
recommended.
@@Reload
@@AutoUpdate
@@Tags
@@Scalars
@@Graph
Expand Down Expand Up @@ -155,7 +146,6 @@ def __init__(self, path, size_guidance=DEFAULT_SIZE_GUIDANCE,
self._images = reservoir.Reservoir(size=sizes[IMAGES])
self._generator_mutex = threading.Lock()
self._generator = _GeneratorFromPath(path)
self._is_autoupdating = False
self._activated = False
self._compression_bps = compression_bps
self.most_recent_step = -1
Expand Down Expand Up @@ -203,36 +193,6 @@ def Reload(self):
value.image)
return self

def AutoUpdate(self, interval=60):
"""Asynchronously load all events, and periodically reload.
Calling this function is not thread safe.
Calling this function activates the `EventAccumulator`.
Args:
interval: how many seconds after each successful reload to load new events
(default 60)
Returns:
The `EventAccumulator`.
"""
if self._is_autoupdating:
return
self._is_autoupdating = True
self._activated = True
def Update():
self.Reload()
logging.info('EventAccumulator update triggered')
t = threading.Timer(interval, Update)
t.daemon = True
t.start()
# Asynchronously start the update process, so that the accumulator can
# immediately serve data, even if there is a very large event file to parse
t = threading.Timer(0, Update)
t.daemon = True
t.start()
return self

def Tags(self):
"""Return all tags found in the value stream.
Expand Down
66 changes: 5 additions & 61 deletions tensorflow/python/summary/event_multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class EventMultiplexer(object):
@@AddRun
@@AddRunsFromDirectory
@@Reload
@@AutoUpdate
@@Runs
@@Scalars
@@Graph
Expand All @@ -93,8 +92,6 @@ def __init__(self, run_path_map=None,
self._accumulators = {}
self._paths = {}
self._reload_called = False
self._autoupdate_called = False
self._autoupdate_interval = None
self._size_guidance = size_guidance
if run_path_map is not None:
for (run, path) in six.iteritems(run_path_map):
Expand All @@ -109,9 +106,9 @@ def AddRun(self, path, name=None):
do nothing. If we are watching a different path, replace the event
accumulator.
If `AutoUpdate` or `Reload` have been called, it will `AutoUpdate` or
`Reload` the newly created accumulators. This maintains the invariant that
once the Multiplexer was activated, all of its accumulators are active.
If `Reload` has been called, it will `Reload` the newly created
accumulators. This maintains the invariant that once the Multiplexer was
activated, all of its accumulators are active.
Args:
path: Path to the event files (or event directory) for given run.
Expand All @@ -138,8 +135,6 @@ def AddRun(self, path, name=None):
if accumulator:
if self._reload_called:
accumulator.Reload()
if self._autoupdate_called:
accumulator.AutoUpdate(self._autoupdate_interval)
return self

def AddRunsFromDirectory(self, path, name=None):
Expand All @@ -153,9 +148,8 @@ def AddRunsFromDirectory(self, path, name=None):
can call AddRunsFromDirectory at the root of a tree of event logs and
TensorBoard will load them all.
If the `EventMultiplexer` is already loaded or autoupdating, this will cause
the newly created accumulators to also `Reload()` or `AutoUpdate()`.
If the `EventMultiplexer` is already loaded this will cause
the newly created accumulators to `Reload()`.
Args:
path: A string path to a directory to load runs from.
name: Optionally, what name to apply to the runs. If name is provided
Expand Down Expand Up @@ -195,16 +189,6 @@ def Reload(self):
l.Reload()
return self

def AutoUpdate(self, interval=60):
"""Call `AutoUpdate(interval)` on every `EventAccumulator`."""
self._autoupdate_interval = interval
self._autoupdate_called = True
with self._accumulators_mutex:
loaders = list(self._accumulators.values())
for l in loaders:
l.AutoUpdate(interval)
return self

def Scalars(self, run, tag):
"""Retrieve the scalar events associated with a run and tag.
Expand Down Expand Up @@ -317,43 +301,3 @@ def Runs(self):
def _GetAccumulator(self, run):
with self._accumulators_mutex:
return self._accumulators[run]


def AutoloadingMultiplexer(path_to_run, interval_secs=60,
size_guidance=event_accumulator.DEFAULT_SIZE_GUIDANCE):
"""Create an `EventMultiplexer` that automatically loads runs in directories.
Args:
path_to_run: Dict `{path: name}` which specifies the path to a directory,
and its name (or `None`). The path may contain tfevents files (in which
case they are loaded, with name as the name of the run) and subdirectories
containing tfevents files (in which case each subdirectory is added as a
run, named `'name/subdirectory'`).
interval_secs: How often to poll the directory for new runs.
size_guidance: How much data to store for each tag of various types - see
`event_accumulator.EventAccumulator`.
Returns:
The multiplexer which will automatically load from the directories.
Raises:
ValueError: if `path_to_run` is `None`
TypeError: if `path_to_run` is not a dict
"""
multiplexer = EventMultiplexer(size_guidance=size_guidance)
if path_to_run is None:
raise ValueError('Cant construct an autoloading multiplexer without runs.')
if not isinstance(path_to_run, dict):
raise TypeError('path_to_run should be a dict, was %s', path_to_run)
def Load():
for (path, name) in six.iteritems(path_to_run):
logging.info('Checking for new runs in %s', path)
multiplexer.AddRunsFromDirectory(path, name)
t = threading.Timer(interval_secs, Load)
t.daemon = True
t.start()
t = threading.Timer(0, Load)
t.daemon = True
t.start()
return multiplexer
24 changes: 0 additions & 24 deletions tensorflow/python/summary/event_multiplexer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class _FakeAccumulator(object):

def __init__(self, path):
self._path = path
self.autoupdate_called = False
self.autoupdate_interval = None
self.reload_called = False

def Tags(self):
Expand Down Expand Up @@ -77,10 +75,6 @@ def Images(self, tag_name):
raise KeyError
return ['%s/%s' % (self._path, tag_name)]

def AutoUpdate(self, interval):
self.autoupdate_called = True
self.autoupdate_interval = interval

def Reload(self):
self.reload_called = True

Expand Down Expand Up @@ -113,14 +107,6 @@ def testReload(self):
self.assertTrue(x._GetAccumulator('run1').reload_called)
self.assertTrue(x._GetAccumulator('run2').reload_called)

def testAutoUpdate(self):
x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
x.AutoUpdate(5)
self.assertTrue(x._GetAccumulator('run1').autoupdate_called)
self.assertEqual(x._GetAccumulator('run1').autoupdate_interval, 5)
self.assertTrue(x._GetAccumulator('run2').autoupdate_called)
self.assertEqual(x._GetAccumulator('run2').autoupdate_interval, 5)

def testScalars(self):
x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})

Expand Down Expand Up @@ -275,15 +261,5 @@ def testAddRunMaintainsLoading(self):
self.assertTrue(x._GetAccumulator('run1').reload_called)
self.assertTrue(x._GetAccumulator('run2').reload_called)

def testAddRunMaintainsAutoUpdate(self):
x = event_multiplexer.EventMultiplexer()
x.AutoUpdate(5)
x.AddRun('run1')
x.AddRun('run2')
self.assertTrue(x._GetAccumulator('run1').autoupdate_called)
self.assertTrue(x._GetAccumulator('run2').autoupdate_called)
self.assertEqual(x._GetAccumulator('run1').autoupdate_interval, 5)
self.assertEqual(x._GetAccumulator('run2').autoupdate_interval, 5)

if __name__ == '__main__':
googletest.main()
27 changes: 22 additions & 5 deletions tensorflow/tensorboard/backend/tensorboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import functools
import os
import socket
import threading
import time

import tensorflow.python.platform

import six
from six.moves import BaseHTTPServer
from six.moves import socketserver

Expand Down Expand Up @@ -72,6 +75,9 @@
event_accumulator.HISTOGRAMS: 1,
}

# How often to reload new data after the latest load (secs)
LOAD_INTERVAL = 60


def ParseEventFilesFlag(flag_value):
"""Parses the logdir flag into a map from paths to run group names.
Expand Down Expand Up @@ -129,15 +135,26 @@ def main(unused_argv=None):
'details and examples.')
return -1

if FLAGS.debug:
logging.info('Starting TensorBoard in directory %s', os.getcwd())
logging.info('Starting TensorBoard in directory %s', os.getcwd())

path_to_run = ParseEventFilesFlag(FLAGS.logdir)
multiplexer = event_multiplexer.AutoloadingMultiplexer(
path_to_run=path_to_run, interval_secs=60,
logging.info('TensorBoard path_to_run is: %s', path_to_run)
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=TENSORBOARD_SIZE_GUIDANCE)

multiplexer.AutoUpdate(interval=30)
def _Load():
start = time.time()
for (path, name) in six.iteritems(path_to_run):
multiplexer.AddRunsFromDirectory(path, name)
multiplexer.Reload()
duration = time.time() - start
logging.info('Multiplexer done loading. Load took %0.1f secs', duration)
t = threading.Timer(LOAD_INTERVAL, _Load)
t.daemon = True
t.start()
t = threading.Timer(0, _Load)
t.daemon = True
t.start()

factory = functools.partial(tensorboard_handler.TensorboardHandler,
multiplexer)
Expand Down

0 comments on commit 34d5a99

Please sign in to comment.