Skip to content

Commit

Permalink
Flink: FLIP-27 source enumerator help classes (apache#4329)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu authored Jun 6, 2022
1 parent 0adf678 commit 31dafee
Show file tree
Hide file tree
Showing 12 changed files with 1,383 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -76,50 +78,81 @@ public static List<IcebergSourceSplit> planIcebergSourceSplits(
}

static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);
ScanMode scanMode = checkScanMode(context);
if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}
if (context.startSnapshotId() != null) {
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}

if (context.endSnapshotId() != null) {
scan = scan.toSnapshot(context.endSnapshotId());
}

return scan.planTasks();
} else {
TableScan scan = table.newScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
return scan.planTasks();
}
}

private enum ScanMode {
BATCH,
INCREMENTAL_APPEND_SCAN
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
}
}

if (context.startSnapshotId() != null) {
if (context.endSnapshotId() != null) {
scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
} else {
scan = scan.appendsAfter(context.startSnapshotId());
}
/**
* refine scan with common configs
*/
private static <T extends Scan<T>> T refineScanWithBaseConfigs(
T scan, ScanContext context, ExecutorService workerPool) {
T refinedScan = scan
.caseSensitive(context.caseSensitive())
.project(context.project())
.planWith(workerPool);

if (context.includeColumnStats()) {
refinedScan = refinedScan.includeColumnStats();
}

if (context.splitSize() != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}

if (context.splitLookback() != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}

if (context.splitOpenFileCost() != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}

if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
refinedScan = refinedScan.filter(filter);
}
}

return scan.planTasks();
return refinedScan;
}
}
Loading

0 comments on commit 31dafee

Please sign in to comment.