Skip to content

Commit

Permalink
Adding flag to mask change
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem committed Jun 15, 2018
1 parent 22f5a81 commit 9cdbbe0
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 22 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def _read_side_inputs(self, tags_and_types):
sources.append(si.source)
# The tracking of time spend reading and bytes read from side inputs is
# behind an experiment flag to test its performance impact.
if 'sideinput_io_metrics' in RuntimeValueProvider.experiments:
if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
si_counter = opcounters.SideInputReadCounter(
self.counter_factory,
self.state_sampler,
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/sideinputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self,
def add_byte_counter(self, reader):
"""Adds byte counter observer to a side input reader.
If the 'sideinput_io_metrics' experiment flag is not passed in, then
If the 'sideinput_io_metrics_v2' experiment flag is not passed in, then
nothing is attached to the reader.
Args:
Expand Down Expand Up @@ -123,7 +123,7 @@ def _reader_thread(self):
# The tracking of time spend reading and bytes read from side
# inputs is kept behind an experiment flag to test performance
# impact.
if 'sideinput_io_metrics' in RuntimeValueProvider.experiments:
if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
self.add_byte_counter(reader)
returns_windowed_values = reader.returns_windowed_values
for value in reader:
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/sideinputs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_bytes_read_behind_experiment(self):

def test_bytes_read_are_reported(self):
RuntimeValueProvider.set_runtime_options(
{'experiments': ['sideinput_io_metrics', 'other']})
{'experiments': ['sideinput_io_metrics_v2', 'other']})
mock_read_counter = mock.MagicMock()
source_records = ['a', 'b', 'c', 'd']
sources = [
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ from libc.stdint cimport int32_t, int64_t
cdef class StateSampler(object):
"""Tracks time spent in states during pipeline execution."""
cdef int _sampling_period_ms
cdef int _sampling_period_ms_start
cdef double _sampling_period_ratio

cdef list scoped_states_by_index

Expand Down
18 changes: 0 additions & 18 deletions sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,6 @@ cdef inline int64_t get_nsec_time() nogil:

cdef class StateSampler(object):
"""Tracks time spent in states during pipeline execution."""
cdef int _sampling_period_ms
cdef int _sampling_period_ms_start
cdef double _sampling_period_ratio

cdef list scoped_states_by_index

cdef public bint started
cdef public bint finished
cdef object sampling_thread

# This lock guards members that are shared between threads, specificaly
# finished, scoped_states_by_index, and the nsecs field of each state therein.
cdef pythread.PyThread_type_lock lock

cdef public int64_t state_transition_count
cdef public int64_t time_since_transition

cdef int32_t current_state_index

def __init__(self,
sampling_period_ms,
Expand Down

0 comments on commit 9cdbbe0

Please sign in to comment.