Skip to content

Commit

Permalink
Merge pull request apache#8300: [BEAM-7035] Compatible wire represent…
Browse files Browse the repository at this point in the history
…ation for timers in Python SDK
  • Loading branch information
tweise authored Apr 17, 2019
2 parents 3682d0c + a002454 commit 8842857
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ public void setTimer(
@Override
public void setTimer(TimerData timer) {
try {
String contextTimerId = getContextTimerId(timer);
String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace());
// Only one timer can exist at a time for a given timer id and context.
// If a timer gets set twice in the same context, the second must
// override the first. Thus, we must cancel any pending timers
Expand Down Expand Up @@ -1052,15 +1052,15 @@ private void cancelPendingTimerById(String contextTimerId) throws Exception {

void cleanupPendingTimer(TimerData timer) {
try {
pendingTimersById.remove(getContextTimerId(timer));
pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
} catch (Exception e) {
throw new RuntimeException("Failed to cleanup state with pending timers", e);
}
}

/** Unique contextual id of a timer. Used to look up any existing timers in a context. */
private String getContextTimerId(TimerData timer) {
return timer.getTimerId() + timer.getNamespace().stringKey();
private String getContextTimerId(String timerId, StateNamespace namespace) {
return timerId + namespace.stringKey();
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
Expand All @@ -1072,7 +1072,11 @@ public void deleteTimer(StateNamespace namespace, String timerId) {

@Override
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
try {
cancelPendingTimerById(getContextTimerId(timerId, namespace));
} catch (Exception e) {
throw new RuntimeException("Failed to cancel timer", e);
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,20 @@ public ByteBuffer getCurrentKey() {

private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerData timerData) {
try {
LOG.debug("Setting timer: {} {}", timerElement, timerData);
// KvToByteBufferKeySelector returns the key encoded
ByteBuffer encodedKey = (ByteBuffer) keySelector.getKey(timerElement);
// We have to synchronize to ensure the state backend is not concurrently accessed by the
// state requests
try {
stateBackendLock.lock();
getKeyedStateBackend().setCurrentKey(encodedKey);
timerInternals.setTimer(timerData);
if (timerData.getTimestamp().isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timerInternals.deleteTimer(
timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain());
} else {
timerInternals.setTimer(timerData);
}
} finally {
stateBackendLock.unlock();
}
Expand Down
22 changes: 19 additions & 3 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,29 @@ def estimate_size(self, value, nested=False):


class TimestampCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
"""For internal use only; no backwards-compatibility guarantees.
TODO: SDK agnostic encoding
For interoperability with Java SDK, encoding needs to match
that of the Java SDK InstantCoder.
https://github.com/apache/beam/blob/f5029b4f0dfff404310b2ef55e2632bbacc7b04f/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java#L79
"""

def encode_to_stream(self, value, out, nested):
out.write_bigendian_int64(value.micros)
millis = value.micros // 1000
if millis >= 0:
millis = millis - _TIME_SHIFT
else:
millis = millis + _TIME_SHIFT
out.write_bigendian_int64(millis)

def decode_from_stream(self, in_stream, nested):
return Timestamp(micros=in_stream.read_bigendian_int64())
millis = in_stream.read_bigendian_int64()
if millis < 0:
millis = millis + _TIME_SHIFT
else:
millis = millis - _TIME_SHIFT
return Timestamp(micros=millis * 1000)

def estimate_size(self, unused_value, nested=False):
# A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of
Expand Down
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ def test_interval_window_coder(self):

def test_timestamp_coder(self):
self.check_coder(coders.TimestampCoder(),
*[timestamp.Timestamp(micros=x) for x in range(-100, 100)])
*[timestamp.Timestamp(micros=x) for x in (-1000, 0, 1000)])
self.check_coder(coders.TimestampCoder(),
timestamp.Timestamp(micros=-1234567890),
timestamp.Timestamp(micros=1234567890))
timestamp.Timestamp(micros=-1234567000),
timestamp.Timestamp(micros=1234567000))
self.check_coder(coders.TimestampCoder(),
timestamp.Timestamp(micros=-1234567890123456789),
timestamp.Timestamp(micros=1234567890123456789))
timestamp.Timestamp(micros=-1234567890123456000),
timestamp.Timestamp(micros=1234567890123456000))
self.check_coder(
coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())),
(timestamp.Timestamp.of(27), b'abc'))
Expand All @@ -208,10 +208,10 @@ def test_timer_coder(self):
self.check_coder(coders._TimerCoder(coders.BytesCoder()),
*[{'timestamp': timestamp.Timestamp(micros=x),
'payload': b'xyz'}
for x in range(-3, 3)])
for x in (-3000, 0, 3000)])
self.check_coder(
coders.TupleCoder((coders._TimerCoder(coders.VarIntCoder()),)),
({'timestamp': timestamp.Timestamp.of(37), 'payload': 389},))
({'timestamp': timestamp.Timestamp.of(37000), 'payload': 389},))

def test_tuple_coder(self):
kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
Expand Down
37 changes: 37 additions & 0 deletions sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,43 @@ def process_timer(self):
expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))

def test_pardo_timers_clear(self):
if type(self).__name__ != 'FlinkRunnerTest':
# FnApiRunner fails to wire multiple timer collections
# this method can replace test_pardo_timers when the issue is fixed
self.skipTest('BEAM-7074: Multiple timer definitions not supported.')

timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
clear_timer_spec = userstate.TimerSpec('clear_timer',
userstate.TimeDomain.WATERMARK)

class TimerDoFn(beam.DoFn):
def process(self, element, timer=beam.DoFn.TimerParam(timer_spec),
clear_timer=beam.DoFn.TimerParam(clear_timer_spec)):
unused_key, ts = element
timer.set(ts)
timer.set(2 * ts)
clear_timer.set(ts)
clear_timer.clear()

@userstate.on_timer(timer_spec)
def process_timer(self):
yield 'fired'

@userstate.on_timer(clear_timer_spec)
def process_clear_timer(self):
yield 'should not fire'

with self.create_pipeline() as p:
actual = (
p
| beam.Create([('k1', 10), ('k2', 100)])
| beam.ParDo(TimerDoFn())
| beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))

expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))

def test_pardo_state_timers(self):
self._run_pardo_state_timers(False)

Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,12 @@ def set(self, ts):
windowed_value.WindowedValue(
(self._key, dict(timestamp=ts)), ts, (self._window,)))

def clear(self, timestamp):
self._receiver.receive((self._key, dict(clear=True)))
def clear(self):
dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
self._receiver.receive(
windowed_value.WindowedValue(
(self._key, dict(timestamp=clear_ts)), 0, (self._window,)))


class FnApiUserStateContext(userstate.UserStateContext):
Expand Down

0 comments on commit 8842857

Please sign in to comment.