Skip to content

Commit

Permalink
Flink: backport Add config for max allowed consecutive planning failu…
Browse files Browse the repository at this point in the history
…res in IcebergSource before failing the job (apache#7571) to 1.16 and 1.15 (apache#7629)
  • Loading branch information
pvary authored May 18, 2023
1 parent 202af03 commit 477da36
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,13 @@ public int workerPoolSize() {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}

public int maxAllowedPlanningFailures() {
return confParser
.intConf()
.option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
.flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,8 @@ private FlinkReadOptions() {}
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);

public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ public Builder<T> exposeLocality(boolean newExposeLocality) {
return this;
}

public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
readOptions.put(
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
Integer.toString(maxAllowedPlanningFailures));
return this;
}

/**
* Set the read properties for Flink source. View the supported properties in {@link
* FlinkReadOptions}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;

private ScanContext(
boolean caseSensitive,
Expand All @@ -86,6 +87,7 @@ private ScanContext(
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -115,6 +117,7 @@ private ScanContext(
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;

validate();
}
Expand Down Expand Up @@ -155,6 +158,10 @@ private void validate() {
Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");

Preconditions.checkArgument(
maxAllowedPlanningFailures >= -1,
"Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
}

public boolean caseSensitive() {
Expand Down Expand Up @@ -253,6 +260,10 @@ public int maxPlanningSnapshotCount() {
return maxPlanningSnapshotCount;
}

public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -277,6 +288,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}

Expand Down Expand Up @@ -304,6 +316,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}

Expand Down Expand Up @@ -341,6 +354,8 @@ public static class Builder {
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
private int maxPlanningSnapshotCount =
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -464,6 +479,11 @@ public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
return this;
}

public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -488,7 +508,8 @@ public Builder resolveConfig(
.limit(flinkReadConf.limit())
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
}

public ScanContext build() {
Expand All @@ -513,6 +534,7 @@ public ScanContext build() {
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** Track enumeration result history for split discovery throttling. */
private final EnumerationHistory enumerationHistory;

/** Count the consecutive failures and throw exception if the max allowed failres are reached */
private transient int consecutiveFailures = 0;

public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
Expand Down Expand Up @@ -122,6 +125,7 @@ private ContinuousEnumerationResult discoverSplits() {
/** This method is executed in a single coordinator thread. */
private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
if (error == null) {
consecutiveFailures = 0;
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
// thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
Expand Down Expand Up @@ -161,7 +165,13 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
LOG.error("Failed to discover new splits", error);
consecutiveFailures++;
if (scanContext.maxAllowedPlanningFailures() < 0
|| consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
LOG.error("Failed to discover new splits", error);
} else {
throw new RuntimeException("Failed to discover new splits", error);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
// track splits per snapshot
private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
private long latestSnapshotId;
private int remainingFailures;

ManualContinuousSplitPlanner(ScanContext scanContext) {
ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
this.splits = new TreeMap<>();
this.latestSnapshotId = 0L;
this.remainingFailures = expectedFailures;
}

@Override
public synchronized ContinuousEnumerationResult planSplits(
IcebergEnumeratorPosition lastPosition) {
if (remainingFailures > 0) {
remainingFailures--;
throw new RuntimeException("Expected failure at planning");
}

long fromSnapshotIdExclusive = 0;
if (lastPosition != null && lastPosition.snapshotId() != null) {
fromSnapshotIdExclusive = lastPosition.snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -81,7 +81,7 @@ public void testDiscoverWhenReaderRegistered() throws Exception {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -110,7 +110,7 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -163,7 +163,7 @@ public void testThrottlingDiscovery() throws Exception {
// discover one snapshot at a time
.maxPlanningSnapshotCount(1)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

Expand Down Expand Up @@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}

@Test
public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(2)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

// Trigger a planning and check that no splits returned due to the planning error
enumeratorContext.triggerAllActions();
Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());

// Second scan planning should succeed and discover the expected splits
enumeratorContext.triggerAllActions();
Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
Assert.assertEquals(1, pendingSplits.size());
IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
}

@Test
public void testOverMaxAllowedPlanningErrors() throws Exception {
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(1)
.build();
ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

// Check that the scheduler response ignores the current error and continues to run until the
// failure limit is reached
enumeratorContext.triggerAllActions();
Assert.assertFalse(
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());

// Check that the task has failed with the expected exception after the failure limit is reached
enumeratorContext.triggerAllActions();
Assert.assertTrue(
enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
Assertions.assertThatThrownBy(
() -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
.hasCauseInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to discover new split");
}

@Test
public void testPlanningIgnoringErrors() throws Exception {
int expectedFailures = 3;
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.maxPlanningSnapshotCount(1)
.maxAllowedPlanningFailures(-1)
.build();
ManualContinuousSplitPlanner splitPlanner =
new ManualContinuousSplitPlanner(scanContext, expectedFailures);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);

// Make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
splitPlanner.addSplits(splits);

Collection<IcebergSourceSplitState> pendingSplits;
// Can not discover the new split with planning failures
for (int i = 0; i < expectedFailures; ++i) {
enumeratorContext.triggerAllActions();
pendingSplits = enumerator.snapshotState(i).pendingSplits();
Assert.assertEquals(0, pendingSplits.size());
}

// Discovered the new split after a successful scan planning
enumeratorContext.triggerAllActions();
pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
Assert.assertEquals(1, pendingSplits.size());
IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
}

private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,13 @@ public int workerPoolSize() {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}

public int maxAllowedPlanningFailures() {
return confParser
.intConf()
.option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
.flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,8 @@ private FlinkReadOptions() {}
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);

public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ public Builder<T> exposeLocality(boolean newExposeLocality) {
return this;
}

public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
readOptions.put(
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
Integer.toString(maxAllowedPlanningFailures));
return this;
}

/**
* Set the read properties for Flink source. View the supported properties in {@link
* FlinkReadOptions}
Expand Down
Loading

0 comments on commit 477da36

Please sign in to comment.