Skip to content

Commit

Permalink
[BEAM-4268] Improving the separation between Metrics API and Execution (
Browse files Browse the repository at this point in the history
apache#5323)

* Improving metrics api - execution separation.
  • Loading branch information
pabloem authored May 11, 2018
1 parent a350152 commit 2a4d2ea
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 10 deletions.
7 changes: 1 addition & 6 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker import statesampler


class MetricKey(object):
Expand Down Expand Up @@ -149,12 +150,6 @@ def current_container(self):
MetricsEnvironment = _MetricsEnvironment()


def metrics_startup():
"""Initialize metrics context to run."""
global statesampler # pylint: disable=global-variable-not-assigned
from apache_beam.runners.worker import statesampler


class MetricsContainer(object):
"""Holds the metrics of a single step and a single bundle."""
def __init__(self, step_name):
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def gauge(namespace, name):
return Metrics.DelegatingGauge(MetricName(namespace, name))

class DelegatingCounter(Counter):
"""Metrics Counter that Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingCounter, self).__init__()
self.metric_name = metric_name

def inc(self, n=1):
Expand All @@ -102,7 +105,10 @@ def inc(self, n=1):
container.get_counter(self.metric_name).inc(n)

class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingDistribution, self).__init__()
self.metric_name = metric_name

def update(self, value):
Expand All @@ -111,7 +117,10 @@ def update(self, value):
container.get_distribution(self.metric_name).update(value)

class DelegatingGauge(Gauge):
"""Metrics Gauge that Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingGauge, self).__init__()
self.metric_name = metric_name

def set(self, value):
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def test_create_counter_distribution(self):
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state('mystep', 'myState',
metrics_container=MetricsContainer('mystep'))
sampler.start()
with state1:
counter_ns = 'aCounterNamespace'
distro_ns = 'aDistributionNamespace'
Expand All @@ -144,6 +145,7 @@ def test_create_counter_distribution(self):
self.assertEqual(
container.distributions[MetricName(distro_ns, name)].get_cumulative(),
DistributionData(12, 2, 2, 10))
sampler.stop()


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSideInput
from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.dataflow.internal.names import PropertyNames
Expand Down Expand Up @@ -362,6 +361,8 @@ def run_pipeline(self, pipeline):
result = DataflowPipelineResult(
self.dataflow_client.create_job(self.job), self)

# TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
result.metric_results = self._metrics
return result
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from apache_beam import coders
from apache_beam import typehints
from apache_beam.internal.util import ArgumentPlaceholder
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
Expand Down Expand Up @@ -357,6 +356,8 @@ def visit_transform(self, applied_ptransform):
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()

# TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
from apache_beam.metrics.execution import MetricsEnvironment
MetricsEnvironment.set_metrics_supported(True)
logging.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/runners/worker/statesampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import threading
from collections import namedtuple

from apache_beam.metrics import execution
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName

Expand Down Expand Up @@ -76,7 +75,6 @@ def stop_if_still_running(self):
def start(self):
self.tracked_thread = threading.current_thread()
set_current_tracker(self)
execution.metrics_startup()
super(StateSampler, self).start()
self.started = True

Expand Down

0 comments on commit 2a4d2ea

Please sign in to comment.