Skip to content

Commit

Permalink
[FLINK-23204] Provide StateBackends access to MailboxExecutor (apache…
Browse files Browse the repository at this point in the history
…#16531)

There are several places in ChangelogStateBackend that need execute actions from the task thread
- DFS writer: collect so far uploaded changes; handle upload results after completion
- ChangelogKeyedStateBackend: checkpointing to combine state handles upon upload completion by writer
- ChangelogKeyedStateBackend: materialization take a snapshot (sync phase) and handle results of the async phase

This PR provides access to mailbox executor to simply threading model (avoid using lock).
  • Loading branch information
curcur authored Jul 22, 2021
1 parent a26887e commit 6709573
Show file tree
Hide file tree
Showing 39 changed files with 138 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
Expand All @@ -47,6 +48,7 @@
import org.apache.flink.util.UserCodeClassLoader;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
Expand Down Expand Up @@ -232,4 +234,20 @@ void acknowledgeCheckpoint(
IndexedInputGate[] getAllInputGates();

TaskEventDispatcher getTaskEventDispatcher();

// --------------------------------------------------------------------------------------------
// Fields set in the StreamTask to provide access to mailbox and other runtime resources
// --------------------------------------------------------------------------------------------

default void setMainMailboxExecutor(MailboxExecutor mainMailboxExecutor) {}

default MailboxExecutor getMainMailboxExecutor() {
throw new UnsupportedOperationException();
}

default void setAsyncOperationsThreadPool(ExecutorService executorService) {}

default ExecutorService getAsyncOperationsThreadPool() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;
package org.apache.flink.runtime.mailbox;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.runtime.concurrent.FutureTaskWithException;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
Expand All @@ -33,7 +31,7 @@

/**
* {@link java.util.concurrent.Executor} like interface for an build around a mailbox-based
* execution model (see {@link TaskMailbox}). {@code MailboxExecutor} can also execute downstream
* execution model (see {@code TaskMailbox}). {@code MailboxExecutor} can also execute downstream
* messages of a mailbox by yielding control from the task thread.
*
* <p>All submission functions can be called from any thread and will enqueue the action for further
Expand All @@ -48,10 +46,10 @@
* <p>The yielding functions will only process events from the operator itself and any downstream
* operator. Events of upstream operators are only processed when the input has been fully processed
* or if they yield themselves. This method avoid congestion and potential deadlocks, but will
* process {@link Mail}s slightly out-of-order, effectively creating a view on the mailbox that
* process {@code Mail}s slightly out-of-order, effectively creating a view on the mailbox that
* contains no message from upstream operators.
*
* <p><b>All yielding functions must be called in the mailbox thread</b> (see {@link
* <p><b>All yielding functions must be called in the mailbox thread</b> (see {@code
* TaskMailbox#isMailboxThread()}) to not violate the single-threaded execution model. There are two
* typical cases, both waiting until the resource is available. The main difference is if the
* resource becomes available through a mailbox message itself or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.util.UserCodeClassLoader;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -92,6 +96,10 @@ public class RuntimeEnvironment implements Environment {

private final Task containingTask;

@Nullable private MailboxExecutor mainMailboxExecutor;

@Nullable private ExecutorService asyncOperationsThreadPool;

// ------------------------------------------------------------------------

public RuntimeEnvironment(
Expand Down Expand Up @@ -307,4 +315,27 @@ public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
public void failExternally(Throwable cause) {
this.containingTask.failExternally(cause);
}

@Override
public void setMainMailboxExecutor(MailboxExecutor mainMailboxExecutor) {
this.mainMailboxExecutor = mainMailboxExecutor;
}

@Override
public MailboxExecutor getMainMailboxExecutor() {
return checkNotNull(
mainMailboxExecutor, "mainMailboxExecutor has not been initialized yet!");
}

@Override
public void setAsyncOperationsThreadPool(ExecutorService executorService) {
this.asyncOperationsThreadPool = executorService;
}

@Override
public ExecutorService getAsyncOperationsThreadPool() {
return checkNotNull(
asyncOperationsThreadPool,
"asyncOperationsThreadPool has not been initialized yet!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;
package org.apache.flink.runtime.mailbox;

import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
Expand All @@ -52,12 +54,16 @@
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.concurrent.Executors;

import com.sun.istack.NotNull;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -120,6 +126,10 @@ public class MockEnvironment implements Environment, AutoCloseable {

private final ExternalResourceInfoProvider externalResourceInfoProvider;

private MailboxExecutor mainMailboxExecutor;

private ExecutorService asyncOperationsThreadPool;

public static MockEnvironmentBuilder builder() {
return new MockEnvironmentBuilder();
}
Expand Down Expand Up @@ -175,6 +185,10 @@ protected MockEnvironment(

this.externalResourceInfoProvider =
Preconditions.checkNotNull(externalResourceInfoProvider);

this.mainMailboxExecutor = new SyncMailboxExecutor();

this.asyncOperationsThreadPool = Executors.newDirectExecutorService();
}

public IteratorWrappingTestSingleInputGate<Record> addInput(
Expand Down Expand Up @@ -386,6 +400,26 @@ public void close() throws Exception {
ioManager.close();
}

@Override
public void setMainMailboxExecutor(@NotNull MailboxExecutor mainMailboxExecutor) {
this.mainMailboxExecutor = mainMailboxExecutor;
}

@Override
public MailboxExecutor getMainMailboxExecutor() {
return mainMailboxExecutor;
}

@Override
public void setAsyncOperationsThreadPool(@NotNull ExecutorService executorService) {
this.asyncOperationsThreadPool = executorService;
}

@Override
public ExecutorService getAsyncOperationsThreadPool() {
return asyncOperationsThreadPool;
}

public void setExpectedExternalFailureCause(Class<? extends Throwable> expectedThrowableClass) {
this.expectedExternalFailureCause = Optional.of(expectedThrowableClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
Expand Down Expand Up @@ -75,6 +76,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -184,12 +186,18 @@ public class ChangelogKeyedStateBackend<K>
*/
private final SequenceNumber materializedTo;

private final MailboxExecutor mainMailboxExecutor;

private final ExecutorService asyncOperationsThreadPool;

public ChangelogKeyedStateBackend(
AbstractKeyedStateBackend<K> keyedStateBackend,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState) {
Collection<ChangelogStateBackendHandle> initialState,
MailboxExecutor mainMailboxExecutor,
ExecutorService asyncOperationsThreadPool) {
this.keyedStateBackend = keyedStateBackend;
this.executionConfig = executionConfig;
this.ttlTimeProvider = ttlTimeProvider;
Expand All @@ -198,6 +206,8 @@ public ChangelogKeyedStateBackend(
this.stateChangelogWriter = stateChangelogWriter;
this.materializedTo = stateChangelogWriter.initialSequenceNumber();
this.changelogStates = new HashMap<>();
this.mainMailboxExecutor = checkNotNull(mainMailboxExecutor);
this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool);
this.completeRestore(initialState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ private <K> ChangelogKeyedStateBackend<K> restore(
env.getExecutionConfig(),
ttlTimeProvider,
changelogStorage.createWriter(operatorIdentifier, keyGroupRange),
baseState));
baseState,
env.getMainMailboxExecutor(),
env.getAsyncOperationsThreadPool()));
}

private Collection<ChangelogStateBackendHandle> castHandles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.runtime.mailbox.MailboxExecutor;

/**
* An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
Expand All @@ -32,7 +33,6 @@
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.flink.streaming.api.operators.async;

import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.runtime.mailbox.MailboxExecutor;

/**
* A factory for creating processing time services with a given {@link MailboxExecutor}. The factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.mailbox.MailboxExecutor;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.StreamOperator;

import javax.annotation.Nonnull;
Expand Down
Loading

0 comments on commit 6709573

Please sign in to comment.