Skip to content

Commit

Permalink
[FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers
Browse files Browse the repository at this point in the history
Introduces a custom TimeServiceProvider to the StreamTask.
This is responsible for defining and updating the current
processingtime for a task and handling all related action,
such as registering timers for actions to be executed in
the future.
  • Loading branch information
kl0u authored and aljoscha committed Jun 27, 2016
1 parent cb2b76d commit 4b5a789
Show file tree
Hide file tree
Showing 28 changed files with 762 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private static class PeriodicWatermarkEmitter implements Triggerable {
//-------------------------------------------------

public void start() {
triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
}

@Override
Expand Down Expand Up @@ -454,7 +454,7 @@ public void trigger(long timestamp) throws Exception {
}

// schedule the next watermark
triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext {
Expand All @@ -57,16 +58,27 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
private final ExecutionConfig execConfig;
private final Object checkpointLock;

private ScheduledExecutorService timer;
private final TimeServiceProvider timerService;

public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
}


public MockRuntimeContext(
int numberOfParallelSubtasks, int indexOfThisSubtask,
ExecutionConfig execConfig,
Object checkpointLock) {

this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
}

public MockRuntimeContext(
int numberOfParallelSubtasks, int indexOfThisSubtask,
int numberOfParallelSubtasks, int indexOfThisSubtask,
ExecutionConfig execConfig,
Object checkpointLock) {
Object checkpointLock,
TimeServiceProvider timerService) {

super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
Collections.<String, Accumulator<?, ?>>emptyMap());
Expand All @@ -75,6 +87,7 @@ public MockRuntimeContext(
this.indexOfThisSubtask = indexOfThisSubtask;
this.execConfig = execConfig;
this.checkpointLock = checkpointLock;
this.timerService = timerService;
}

@Override
Expand Down Expand Up @@ -186,16 +199,17 @@ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException();
}


public long getCurrentProcessingTime() {
Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
return timerService.getCurrentProcessingTime();
}

@Override
public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");

final long delay = Math.max(time - System.currentTimeMillis(), 0);

return timer.schedule(new Runnable() {
return timerService.registerTimer(time, new Runnable() {
@Override
public void run() {
synchronized (checkpointLock) {
Expand All @@ -207,7 +221,7 @@ public void run() {
}
}
}
}, delay, TimeUnit.MILLISECONDS);
});
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ protected ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return container.registerTimer(time, target);
}

protected long getCurrentProcessingTime() {
return container.getCurrentProcessingTime();
}

/**
* Creates a partitioned state handle, using the state backend configured for this task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,15 @@ public InputSplitProvider getInputSplitProvider() {
public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return operator.registerTimer(time, target);
}


/**
* Returns the current processing time as defined by the task's
* {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
*/
public long getCurrentProcessingTime() {
return operator.getCurrentProcessingTime();
}

// ------------------------------------------------------------------------
// broadcast variables
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected EventTimeSessionWindows(long sessionTimeout) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private GlobalWindows() {}

@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ protected ProcessingTimeSessionWindows(long sessionTimeout) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected SlidingEventTimeWindows(long size, long slide) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private SlidingProcessingTimeWindows(long size, long slide) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
timestamp = System.currentTimeMillis();
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected TumblingEventTimeWindows(long size) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ private TumblingProcessingTimeWindows(long size) {
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
final long now = System.currentTimeMillis();
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = now - (now % size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The {@link WindowAssignerContext} in which the assigner operates.
*/
public abstract Collection<W> assignWindows(T element, long timestamp);
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

/**
* Returns the default trigger associated with this {@code WindowAssigner}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.streaming.runtime.tasks.StreamTask;

/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time. This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract class WindowAssignerContext {

/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
public abstract long getCurrentProcessingTime();

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
* A {@link Trigger} that continuously fires based on a given time interval. The time is the current
* system time.
* A {@link Trigger} that continuously fires based on a given time interval as measured by
* the clock of the machine on which the job is running.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
Expand All @@ -52,7 +52,7 @@ private ContinuousProcessingTimeTrigger(long interval) {
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

timestamp = System.currentTimeMillis();
timestamp = ctx.getCurrentProcessingTime();

if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
Expand Down Expand Up @@ -87,6 +87,8 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private ProcessingTimeTrigger() {}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.getEnd());
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import java.io.Serializable;

Expand Down Expand Up @@ -126,6 +127,12 @@ public void clear(W window, TriggerContext ctx) throws Exception {}
*/
public interface TriggerContext {

/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
long getCurrentProcessingTime();

/**
* Returns the metric group for this {@link Trigger}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
Expand Down Expand Up @@ -170,7 +177,7 @@ public interface TriggerContext {
void deleteEventTimeTimer(long time);

/**
* Retrieves an {@link State} object that can be used to interact with
* Retrieves a {@link State} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current
* trigger invocation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
*/
@Internal
public class StreamInputProcessor<IN> {

private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;

private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
Expand All @@ -76,7 +76,7 @@ public class StreamInputProcessor<IN> {

private boolean isFinished;



private final long[] watermarks;
private long lastEmittedWatermark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {

Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(),
element.getTimestamp());
element.getValue(),
element.getTimestamp(),
windowAssignerContext);

final K key = (K) getStateBackend().getCurrentKey();

Expand Down
Loading

0 comments on commit 4b5a789

Please sign in to comment.