Skip to content

Commit

Permalink
[FLINK-20217][task] Add support for splittable timers to AbstractStre…
Browse files Browse the repository at this point in the history
…amOperatorV2
  • Loading branch information
pnowojski committed Jun 12, 2024
1 parent f26ee47 commit 3e9d047
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.IndexedCombinedWatermarkStatus;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
Expand Down Expand Up @@ -90,6 +93,8 @@ public abstract class AbstractStreamOperatorV2<OUT>
protected final StreamConfig config;
protected final Output<StreamRecord<OUT>> output;
private final StreamingRuntimeContext runtimeContext;
private final MailboxExecutor mailboxExecutor;

private final ExecutionConfig executionConfig;
private final ClassLoader userCodeClassLoader;
private final CloseableRegistry cancelables;
Expand All @@ -104,6 +109,7 @@ public abstract class AbstractStreamOperatorV2<OUT>

protected StreamOperatorStateHandler stateHandler;
protected InternalTimeServiceManager<?> timeServiceManager;
private @Nullable MailboxWatermarkProcessor watermarkProcessor;

public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
final Environment environment = parameters.getContainingTask().getEnvironment();
Expand Down Expand Up @@ -136,6 +142,8 @@ public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int nu
processingTimeService,
null,
environment.getExternalResourceInfoProvider());

mailboxExecutor = parameters.getMailboxExecutor();
}

private LatencyStats createLatencyStats(
Expand Down Expand Up @@ -214,6 +222,33 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);

if (useSplittableTimers()
&& areSplittableTimersConfigured()
&& getTimeServiceManager().isPresent()) {
watermarkProcessor =
new MailboxWatermarkProcessor(
output, mailboxExecutor, getTimeServiceManager().get());
}
}

/**
* Can be overridden to disable splittable timers for this particular operator even if config
* option is enabled. By default, splittable timers are disabled.
*
* @return {@code true} if splittable timers should be used (subject to {@link
* StreamConfig#isUnalignedCheckpointsEnabled()} and {@link
* StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. {@code false} if
* splittable timers should never be used.
*/
@Internal
public boolean useSplittableTimers() {
return false;
}

@Internal
private boolean areSplittableTimersConfigured() {
return AbstractStreamOperator.areSplittableTimersConfigured(config);
}

/**
Expand Down Expand Up @@ -481,6 +516,14 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
}

public void processWatermark(Watermark mark) throws Exception {
if (watermarkProcessor != null) {
watermarkProcessor.emitWatermarkInsideMailbox(mark);
} else {
emitWatermarkDirectly(mark);
}
}

private void emitWatermarkDirectly(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ Tuple2<OP, Optional<ProcessingTimeService>> createOperator(
processingTimeService != null
? () -> processingTimeService
: processingTimeServiceFactory,
operatorEventDispatcher));
operatorEventDispatcher,
mailboxExecutor));
if (op instanceof YieldingOperator) {
((YieldingOperator<?>) op).setMailboxExecutor(mailboxExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -44,6 +45,7 @@ public class StreamOperatorParameters<OUT> {
private final Output<StreamRecord<OUT>> output;
private final Supplier<ProcessingTimeService> processingTimeServiceFactory;
private final OperatorEventDispatcher operatorEventDispatcher;
private final MailboxExecutor mailboxExecutor;

/**
* The ProcessingTimeService, lazily created, but cached so that we don't create more than one.
Expand All @@ -55,12 +57,14 @@ public StreamOperatorParameters(
StreamConfig config,
Output<StreamRecord<OUT>> output,
Supplier<ProcessingTimeService> processingTimeServiceFactory,
OperatorEventDispatcher operatorEventDispatcher) {
OperatorEventDispatcher operatorEventDispatcher,
MailboxExecutor mailboxExecutor) {
this.containingTask = containingTask;
this.config = config;
this.output = output;
this.processingTimeServiceFactory = processingTimeServiceFactory;
this.operatorEventDispatcher = operatorEventDispatcher;
this.mailboxExecutor = mailboxExecutor;
}

public StreamTask<?, ?> getContainingTask() {
Expand All @@ -85,4 +89,8 @@ public ProcessingTimeService getProcessingTimeService() {
public OperatorEventDispatcher getOperatorEventDispatcher() {
return operatorEventDispatcher;
}

public MailboxExecutor getMailboxExecutor() {
return mailboxExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.flink.api.common.operators.MailboxExecutor;

/**
* An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators
* This class is no longer needed. {@link MailboxExecutor} is accessible via {@link
* StreamOperatorParameters#getMailboxExecutor()}.
*
* <p>An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators
* needs to be created through a factory implementing this interface.
*/
@Experimental
@Deprecated
public interface YieldingOperatorFactory<OUT> extends StreamOperatorFactory<OUT> {
void setMailboxExecutor(MailboxExecutor mailboxExecutor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ private StreamOperatorParameters<RowData> createSubOperatorParameters(
streamConfig,
output,
multipleInputOperatorParameters::getProcessingTimeService,
multipleInputOperatorParameters.getOperatorEventDispatcher());
multipleInputOperatorParameters.getOperatorEventDispatcher(),
multipleInputOperatorParameters.getMailboxExecutor());
}

protected StreamConfig createStreamConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand Down Expand Up @@ -154,6 +155,7 @@ protected StreamOperatorParameters<RowData> createStreamOperatorParameters(
createStreamConfig(),
output,
TestProcessingTimeService::new,
null);
null,
new SyncMailboxExecutor());
}
}

0 comments on commit 3e9d047

Please sign in to comment.