Skip to content

Commit

Permalink
[FLINK-35886][task] Expose Clock from ProcessingTimeService
Browse files Browse the repository at this point in the history
Operators have to be serializable and they will also need an access
to Clock to construct ProgressBlockingRelativeClock. Because we
also want to be able to provide for testing purposes ManualClock
we have to find a way how Operators could obtain a Clock instance.
Exposing Clock from ProcessingTimeService sounds like a good place
as it also will provide a since source of processing time for
potential users (for example ProgressBlockingRelativeClock and
firing timers)
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 934007c commit a76c711
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.NeverCompleteFuture;

import java.util.concurrent.CompletableFuture;
Expand All @@ -33,8 +35,8 @@ public final class NeverFireProcessingTimeService implements TimerService {
private AtomicBoolean shutdown = new AtomicBoolean(true);

@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
public Clock getClock() {
return SystemClock.getInstance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,15 @@
*/
public interface ProcessingTimeService
extends org.apache.flink.api.common.operators.ProcessingTimeService {

@Override
default long getCurrentProcessingTime() {
return getClock().absoluteTimeMillis();
}

/** Returns {@link Clock} associated with this timer service. */
Clock getClock();

/**
* Registers a task to be executed repeatedly at a fixed rate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.concurrent.NeverCompleteFuture;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -53,8 +54,8 @@ class ProcessingTimeServiceImpl implements ProcessingTimeService {
}

@Override
public long getCurrentProcessingTime() {
return timerService.getCurrentProcessingTime();
public Clock getClock() {
return timerService.getClock();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.NeverCompleteFuture;

import org.slf4j.Logger;
Expand Down Expand Up @@ -86,8 +88,8 @@ public class SystemProcessingTimeService implements TimerService {
}

@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
public Clock getClock() {
return SystemClock.getInstance();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashSet;
import java.util.PriorityQueue;
Expand All @@ -30,6 +32,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -38,14 +41,14 @@
*/
public class TestProcessingTimeService implements TimerService {

private volatile long currentTime = Long.MIN_VALUE;

private volatile boolean isTerminated;
private volatile boolean isQuiesced;

// sorts the timers by timestamp so that they are processed in the correct order.
private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue;

private final ManualMSClock clock = new ManualMSClock(Long.MIN_VALUE);

public TestProcessingTimeService() {
this.priorityQueue =
new PriorityQueue<>(
Expand All @@ -60,14 +63,19 @@ public int compare(
}

public void advance(long delta) throws Exception {
setCurrentTime(this.currentTime + delta);
clock.advanceTime(Duration.ofMillis(delta));
maybeFireTimers();
}

public void setCurrentTime(long timestamp) throws Exception {
this.currentTime = timestamp;
clock.setCurrentTime(timestamp, TimeUnit.MILLISECONDS);
maybeFireTimers();
}

private void maybeFireTimers() throws Exception {
if (!isQuiesced) {
while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) {
while (!priorityQueue.isEmpty()
&& getCurrentProcessingTime() >= priorityQueue.peek().f0) {
Tuple2<Long, CallbackTask> entry = priorityQueue.poll();

CallbackTask callbackTask = entry.f1;
Expand All @@ -88,8 +96,8 @@ public void setCurrentTime(long timestamp) throws Exception {
}

@Override
public long getCurrentProcessingTime() {
return currentTime;
public Clock getClock() {
return clock;
}

@Override
Expand Down Expand Up @@ -121,7 +129,8 @@ public ScheduledFuture<?> scheduleAtFixedRate(
PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period);

priorityQueue.offer(
Tuple2.<Long, CallbackTask>of(currentTime + initialDelay, periodicCallbackTask));
Tuple2.<Long, CallbackTask>of(
getCurrentProcessingTime() + initialDelay, periodicCallbackTask));

return periodicCallbackTask;
}
Expand Down Expand Up @@ -264,4 +273,45 @@ public long nextTimestamp(long currentTimestamp) {
return currentTimestamp + period;
}
}

/**
* Similar to {@link org.apache.flink.util.clock.ManualClock}, but with ms precision and thus
* greater range. This is needed to support registering and firing timers with {@link
* Long#MAX_VALUE}.
*/
private static class ManualMSClock extends Clock {
private final AtomicLong currentTime;

public ManualMSClock(long startTime) {
this.currentTime = new AtomicLong(startTime);
}

@Override
public long absoluteTimeMillis() {
return currentTime.get();
}

@Override
public long relativeTimeMillis() {
return currentTime.get();
}

@Override
public long relativeTimeNanos() {
return currentTime.get() * 1_000_000;
}

/**
* Advances the time by the given duration. Time can also move backwards by supplying a
* negative value. This method performs no overflow check.
*/
public void advanceTime(Duration duration) {
currentTime.addAndGet(duration.toMillis());
}

/** Sets the time to the given value. */
public void setCurrentTime(long time, TimeUnit timeUnit) {
currentTime.set(timeUnit.toMillis(time));
}
}
}

0 comments on commit a76c711

Please sign in to comment.