Skip to content

Commit

Permalink
[FLINK-26233][connectors/filesystem] Fix the FileSinkCompactionSwitch…
Browse files Browse the repository at this point in the history
…ITCase.testSwitchingCompaction.

This closes apache#18837.
  • Loading branch information
pltbkd authored and gaoyunhaii committed Feb 21, 2022
1 parent 0c429a3 commit 8af2495
Showing 1 changed file with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder;
import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
Expand All @@ -43,6 +44,8 @@
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand Down Expand Up @@ -136,11 +139,12 @@ public void teardown() {
@Test
public void testSwitchingCompaction() throws Exception {
String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
String cpPath = "file://" + TEMPORARY_FOLDER.newFolder().getAbsolutePath();

SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap =
sharedObjects.add(new ConcurrentHashMap<>());
JobGraph jobGraph = createJobGraph(path, isOnToOff, false, sendCountMap);
JobGraph restoringJobGraph = createJobGraph(path, !isOnToOff, true, sendCountMap);
JobGraph jobGraph = createJobGraph(path, cpPath, isOnToOff, false, sendCountMap);
JobGraph restoringJobGraph = createJobGraph(path, cpPath, !isOnToOff, true, sendCountMap);

final Configuration config = new Configuration();
config.setString(RestOptions.BIND_PORT, "18081-19000");
Expand Down Expand Up @@ -180,16 +184,21 @@ public void testSwitchingCompaction() throws Exception {

private JobGraph createJobGraph(
String path,
String cpPath,
boolean compactionEnabled,
boolean isFinite,
SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
// disable changelog state in case it's randomly enabled, since it will fail the savepoint
config.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
env.configure(config, getClass().getClassLoader());

env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(cpPath));
env.setStateBackend(new HashMapStateBackend());

env.addSource(new CountingTestSource(latchId, NUM_RECORDS, isFinite, sendCountMap))
.setParallelism(NUM_SOURCES)
Expand Down Expand Up @@ -356,10 +365,15 @@ public void run(SourceContext<Integer> ctx) throws Exception {
latch.await();
}

private void sendRecordsUntil(int targetNumber, SourceContext<Integer> ctx) {
private void sendRecordsUntil(int targetNumber, SourceContext<Integer> ctx)
throws InterruptedException {
while (!isCanceled && nextValue < targetNumber) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextValue++);
if (!isFinite && nextValue % 100 == 0) {
// slow down the source in case too many records are sent
Thread.sleep(1);
}
}
}
}
Expand Down

0 comments on commit 8af2495

Please sign in to comment.