Skip to content

Commit

Permalink
Flink: Add config for max allowed consecutive planning failures in Ic…
Browse files Browse the repository at this point in the history
…ebergSource before failing the job (apache#7571)
  • Loading branch information
pvary authored May 17, 2023
1 parent 890ca0e commit 7cbde14
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 29 deletions.
45 changes: 23 additions & 22 deletions docs/flink-configuration.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,13 @@ public int workerPoolSize() {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}

public int maxAllowedPlanningFailures() {
return confParser
.intConf()
.option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
.flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,8 @@ private FlinkReadOptions() {}
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);

public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ public Builder<T> exposeLocality(boolean newExposeLocality) {
return this;
}

public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
readOptions.put(
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
Integer.toString(maxAllowedPlanningFailures));
return this;
}

/**
* Set the read properties for Flink source. View the supported properties in {@link
* FlinkReadOptions}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;

private ScanContext(
boolean caseSensitive,
Expand All @@ -86,6 +87,7 @@ private ScanContext(
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -115,6 +117,7 @@ private ScanContext(
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;

validate();
}
Expand Down Expand Up @@ -155,6 +158,10 @@ private void validate() {
Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");

Preconditions.checkArgument(
maxAllowedPlanningFailures >= -1,
"Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
}

public boolean caseSensitive() {
Expand Down Expand Up @@ -253,6 +260,10 @@ public int maxPlanningSnapshotCount() {
return maxPlanningSnapshotCount;
}

public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -277,6 +288,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}

Expand Down Expand Up @@ -304,6 +316,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}

Expand Down Expand Up @@ -341,6 +354,8 @@ public static class Builder {
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
private int maxPlanningSnapshotCount =
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -464,6 +479,11 @@ public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
return this;
}

public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -488,7 +508,8 @@ public Builder resolveConfig(
.limit(flinkReadConf.limit())
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
}

public ScanContext build() {
Expand All @@ -513,6 +534,7 @@ public ScanContext build() {
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** Track enumeration result history for split discovery throttling. */
private final EnumerationHistory enumerationHistory;

/** Count the consecutive failures and throw exception if the max allowed failres are reached */
private transient int consecutiveFailures = 0;

public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
Expand Down Expand Up @@ -122,6 +125,7 @@ private ContinuousEnumerationResult discoverSplits() {
/** This method is executed in a single coordinator thread. */
private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
if (error == null) {
consecutiveFailures = 0;
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
// thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
Expand Down Expand Up @@ -161,7 +165,13 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
LOG.error("Failed to discover new splits", error);
consecutiveFailures++;
if (scanContext.maxAllowedPlanningFailures() < 0
|| consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
LOG.error("Failed to discover new splits", error);
} else {
throw new RuntimeException("Failed to discover new splits", error);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
// track splits per snapshot
private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
private long latestSnapshotId;
private int remainingFailures;

ManualContinuousSplitPlanner(ScanContext scanContext) {
ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
this.splits = new TreeMap<>();
this.latestSnapshotId = 0L;
this.remainingFailures = expectedFailures;
}

@Override
public synchronized ContinuousEnumerationResult planSplits(
IcebergEnumeratorPosition lastPosition) {
if (remainingFailures > 0) {
remainingFailures--;
throw new RuntimeException("Expected failure at planning");
}

long fromSnapshotIdExclusive = 0;
if (lastPosition != null && lastPosition.snapshotId() != null) {
fromSnapshotIdExclusive = lastPosition.snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -81,7 +81,7 @@ public void testDiscoverWhenReaderRegistered() throws Exception {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -110,7 +110,7 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -163,7 +163,7 @@ public void testThrottlingDiscovery() throws Exception {
// discover one snapshot at a time
.maxPlanningSnapshotCount(1)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}

@Test
public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(2)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

// Trigger a planning and check that no splits returned due to the planning error
enumeratorContext.triggerAllActions();
Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());

// Second scan planning should succeed and discover the expected splits
enumeratorContext.triggerAllActions();
Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
Assert.assertEquals(1, pendingSplits.size());
IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
}

@Test
public void testOverMaxAllowedPlanningErrors() throws Exception {
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(1)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

// Check that the scheduler response ignores the current error and continues to run until the
// failure limit is reached
enumeratorContext.triggerAllActions();
Assert.assertFalse(
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());

// Check that the task has failed with the expected exception after the failure limit is reached
enumeratorContext.triggerAllActions();
Assert.assertTrue(
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
Assertions.assertThatThrownBy(
() -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
.hasCauseInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to discover new split");
}

@Test
public void testPlanningIgnoringErrors() throws Exception {
int expectedFailures = 3;
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(-1)
.build();
ManualContinuousSplitPlanner splitPlanner =
new ManualContinuousSplitPlanner(scanContext, expectedFailures);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

Collection<IcebergSourceSplitState> pendingSplits;
// Can not discover the new split with planning failures
for (int i = 0; i < expectedFailures; ++i) {
enumeratorContext.triggerAllActions();
pendingSplits = enumerator.snapshotState(i).pendingSplits();
Assert.assertEquals(0, pendingSplits.size());
}

// Discovered the new split after a successful scan planning
enumeratorContext.triggerAllActions();
pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
Assert.assertEquals(1, pendingSplits.size());
IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
}

private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,
Expand Down

0 comments on commit 7cbde14

Please sign in to comment.