Skip to content

Commit

Permalink
Flink 1.15: Port PR apache#4329 to add FLIP-27 enumerator classes (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu authored Jun 7, 2022
1 parent 9a8b83c commit 5d6c6cc
Show file tree
Hide file tree
Showing 12 changed files with 1,383 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -76,50 +78,81 @@ public static List<IcebergSourceSplit> planIcebergSourceSplits(
}

static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);
ScanMode scanMode = checkScanMode(context);
if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}
if (context.startSnapshotId() != null) {
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}

if (context.endSnapshotId() != null) {
scan = scan.toSnapshot(context.endSnapshotId());
}

return scan.planTasks();
} else {
TableScan scan = table.newScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
return scan.planTasks();
}
}

private enum ScanMode {
BATCH,
INCREMENTAL_APPEND_SCAN
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
}
}

if (context.startSnapshotId() != null) {
if (context.endSnapshotId() != null) {
scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
} else {
scan = scan.appendsAfter(context.startSnapshotId());
}
/**
* refine scan with common configs
*/
private static <T extends Scan<T>> T refineScanWithBaseConfigs(
T scan, ScanContext context, ExecutorService workerPool) {
T refinedScan = scan
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);

if (context.includeColumnStats()) {
refinedScan = refinedScan.includeColumnStats();
}

if (context.splitSize() != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}

if (context.splitLookback() != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}

if (context.splitOpenFileCost() != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}

if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
refinedScan = refinedScan.filter(filter);
}
}

return scan.planTasks();
return refinedScan;
}
}
Loading

0 comments on commit 5d6c6cc

Please sign in to comment.