Skip to content

Commit

Permalink
[FLINK-7213] Introduce state management by OperatorID in TaskManager
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Aug 15, 2017
1 parent 3b0321a commit b71154a
Show file tree
Hide file tree
Showing 63 changed files with 1,185 additions and 986 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
Expand Down Expand Up @@ -74,14 +76,15 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand Down Expand Up @@ -137,6 +140,7 @@ public String getKey(String value) throws Exception {
streamConfig.setStateBackend(backend);

streamConfig.setStreamOperator(new AsyncCheckpointOperator());
streamConfig.setOperatorID(new OperatorID());

final OneShotLatch delayCheckpointLatch = new OneShotLatch();
final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
Expand All @@ -152,7 +156,7 @@ public String getKey(String value) throws Exception {
public void acknowledgeCheckpoint(
long checkpointId,
CheckpointMetrics checkpointMetrics,
SubtaskState checkpointStateHandles) {
TaskStateSnapshot checkpointStateHandles) {

super.acknowledgeCheckpoint(checkpointId, checkpointMetrics);

Expand All @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}

boolean hasManagedKeyedState = false;
for (Map.Entry<OperatorID, OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) {
OperatorSubtaskState state = entry.getValue();
if (state != null) {
hasManagedKeyedState |= state.getManagedKeyedState() != null;
}
}

// should be one k/v state
assertNotNull(checkpointStateHandles.getManagedKeyedState());
assertTrue(hasManagedKeyedState);

// we now know that the checkpoint went through
ensureCheckpointLatch.trigger();
Expand Down Expand Up @@ -241,6 +253,7 @@ public String getKey(String value) throws Exception {
streamConfig.setStateBackend(backend);

streamConfig.setStreamOperator(new AsyncCheckpointOperator());
streamConfig.setOperatorID(new OperatorID());

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -1016,7 +1015,7 @@ int getNumScheduledTasks() {
* Restores the latest checkpointed state.
*
* @param tasks Map of job vertices to restore. State for these vertices is
* restored via {@link Execution#setInitialState(TaskStateHandles)}.
* restored via {@link Execution#setInitialState(TaskStateSnapshot)}.
* @param errorIfNoCheckpoint Fail if no completed checkpoint is available to
* restore from.
* @param allowNonRestoredState Allow checkpoint state that cannot be mapped
Expand Down Expand Up @@ -1102,7 +1101,7 @@ public boolean restoreLatestCheckpointedState(
* mapped to any job vertex in tasks.
* @param tasks Map of job vertices to restore. State for these
* vertices is restored via
* {@link Execution#setInitialState(TaskStateHandles)}.
* {@link Execution#setInitialState(TaskStateSnapshot)}.
* @param userClassLoader The class loader to resolve serialized classes in
* legacy savepoint versions.
*/
Expand Down Expand Up @@ -1256,7 +1255,7 @@ private void discardSubtaskState(
final JobID jobId,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final SubtaskState subtaskState) {
final TaskStateSnapshot subtaskState) {

if (subtaskState != null) {
executor.execute(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void acknowledgeCheckpoint(
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final SubtaskState subtaskState);
final TaskStateSnapshot subtaskState);

void declineCheckpoint(
JobID jobID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.Objects;

/**
* Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub
* tasks of an operator.
* Simple container class which contains the raw/managed/legacy operator state and key-group state handles from all sub
* tasks of an operator and therefore represents the complete state of a logical operator.
*/
public class OperatorState implements CompositeStateHandle {

Expand Down
Loading

0 comments on commit b71154a

Please sign in to comment.