Skip to content

Commit

Permalink
[FLINK-29397][runtime] Check if changelog provider is null
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 22, 2022
1 parent 5766d50 commit 162db04
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ public TestTaskStateManager(
ExecutionAttemptID executionAttemptID,
CheckpointResponder checkpointResponder,
LocalRecoveryConfig localRecoveryConfig,
StateChangelogStorage<?> changelogStorage,
@Nullable StateChangelogStorage<?> changelogStorage,
Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId,
long reportedCheckpointId,
OneShotLatch waitForReportLatch) {
this.jobId = checkNotNull(jobId);
this.executionAttemptID = checkNotNull(executionAttemptID);
this.checkpointResponder = checkNotNull(checkpointResponder);
this.localRecoveryDirectoryProvider = checkNotNull(localRecoveryConfig);
this.stateChangelogStorage = checkNotNull(changelogStorage);
this.stateChangelogStorage = changelogStorage;
this.jobManagerTaskStateSnapshotsByCheckpointId =
checkNotNull(jobManagerTaskStateSnapshotsByCheckpointId);
this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -40,7 +42,10 @@ public class TestTaskStateManagerBuilder {
private ExecutionAttemptID executionAttemptID = createExecutionAttemptId();
private CheckpointResponder checkpointResponder = new TestCheckpointResponder();
private LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();

@Nullable
private StateChangelogStorage<?> stateChangelogStorage = new InMemoryStateChangelogStorage();

private final Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId =
new HashMap<>();
private long reportedCheckpointId = -1L;
Expand Down Expand Up @@ -75,7 +80,7 @@ public TestTaskStateManagerBuilder setStateChangelogStorage(
this.stateChangelogStorage == null
|| this.stateChangelogStorage instanceof InMemoryStateChangelogStorage,
"StateChangelogStorage was already initialized to " + this.stateChangelogStorage);
this.stateChangelogStorage = checkNotNull(stateChangelogStorage);
this.stateChangelogStorage = stateChangelogStorage;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,14 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
} else if (!inputProcessor.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
} else {
} else if (changelogWriterAvailabilityProvider != null) {
// currently, waiting for changelog availability is reported as busy
// todo: add new metric (FLINK-24402)
timer = null;
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
return;
}
assertNoException(
resumeFuture.thenRun(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.StopMode;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
Expand Down Expand Up @@ -1193,6 +1195,29 @@ public void testProcessWithAvailableOutput() throws Exception {
}
}

@Test
public void testProcessWithRaceInDataAvailability() throws Exception {
try (final MockEnvironment environment =
MockEnvironment.builder()
.setTaskStateManager(
TestTaskStateManager.builder()
// replicate NPE of FLINK-29397
.setStateChangelogStorage(null)
.build())
.build()) {
environment.addOutputs(
Collections.singletonList(new AvailabilityTestResultPartitionWriter(true)));

final StreamInputProcessor inputProcessor = new RacyTestInputProcessor();
final StreamTask<?, ?> task =
new MockStreamTaskBuilder(environment)
.setStreamInputProcessor(inputProcessor)
.build();

task.invoke();
}
}

/**
* In this weird construct, we are:
*
Expand Down Expand Up @@ -1954,6 +1979,38 @@ public CompletableFuture<?> getAvailableFuture() {
}
}

/**
* A stream input processor implementation that replicates a race condition where processInput
* reports that nothing is available, but isAvailable (called later) returns true.
*/
private static class RacyTestInputProcessor implements StreamInputProcessor {

private boolean firstCall = true;

@Override
public DataInputStatus processInput() {
try {
return firstCall ? DataInputStatus.NOTHING_AVAILABLE : DataInputStatus.END_OF_INPUT;
} finally {
firstCall = false;
}
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter, final long checkpointId) {
return FutureUtils.completedVoidFuture();
}

@Override
public void close() throws IOException {}

@Override
public CompletableFuture<?> getAvailableFuture() {
return AvailabilityProvider.AVAILABLE;
}
}

public static Task createTask(
Class<? extends TaskInvokable> invokable,
ShuffleEnvironment shuffleEnvironment,
Expand Down

0 comments on commit 162db04

Please sign in to comment.