Skip to content

Commit

Permalink
Core: Refactor ManifestListReadTask to avoid extra S3 calls (apache#6460
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rdblue authored Dec 20, 2022
1 parent e01084e commit 82ae1c6
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,11 +120,8 @@ protected TableScan newRefinedScan(
@Override
protected CloseableIterable<FileScanTask> doPlanFiles() {
FileIO io = table().io();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());
Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

SnapshotEvaluator snapshotEvaluator =
new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(), isCaseSensitive());
Expand All @@ -137,19 +133,8 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
filteredSnapshots,
snap -> {
if (snap.manifestListLocation() != null) {
DataFile manifestListAsDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(snap.manifestListLocation()))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(
io,
schema(),
specs,
new BaseFileScanTask(
manifestListAsDataFile, null, schemaString, specString, residuals),
snap.snapshotId());
io, schema(), specs, snap.manifestListLocation(), filter, snap.snapshotId());
} else {
return StaticDataTask.of(
io.newInputFile(tableOps().current().metadataFileLocation()),
Expand All @@ -168,31 +153,35 @@ static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
private final FileScanTask manifestListTask;
private final String manifestListLocation;
private final Expression residual;
private final long referenceSnapshotId;
private DataFile lazyDataFile = null;

ManifestListReadTask(
FileIO io,
Schema schema,
Map<Integer, PartitionSpec> specs,
FileScanTask manifestListTask,
String manifestListLocation,
Expression residual,
long referenceSnapshotId) {
this.io = io;
this.schema = schema;
this.specs = specs;
this.manifestListTask = manifestListTask;
this.manifestListLocation = manifestListLocation;
this.residual = residual;
this.referenceSnapshotId = referenceSnapshotId;
}

@Override
public List<DeleteFile> deletes() {
return manifestListTask.deletes();
return ImmutableList.of();
}

@Override
public CloseableIterable<StructLike> rows() {
try (CloseableIterable<ManifestFile> manifests =
Avro.read(io.newInputFile(manifestListTask.file().path().toString()))
Avro.read(io.newInputFile(manifestListLocation))
.rename("manifest_file", GenericManifestFile.class.getName())
.rename("partitions", GenericPartitionFieldSummary.class.getName())
.rename("r508", GenericPartitionFieldSummary.class.getName())
Expand All @@ -212,19 +201,27 @@ public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(rowIterable, projection::wrap);

} catch (IOException e) {
throw new RuntimeIOException(
e, "Cannot read manifest list file: %s", manifestListTask.file().path());
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListLocation);
}
}

@Override
public DataFile file() {
return manifestListTask.file();
if (lazyDataFile == null) {
this.lazyDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(manifestListLocation))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
}

return lazyDataFile;
}

@Override
public PartitionSpec spec() {
return manifestListTask.spec();
return PartitionSpec.unpartitioned();
}

@Override
Expand All @@ -234,12 +231,14 @@ public long start() {

@Override
public long length() {
return manifestListTask.length();
// return a generic length to avoid looking up the actual length
return 8192;
}

@Override
public Expression residual() {
return manifestListTask.residual();
// this table is unpartitioned so the residual is always constant
return residual;
}

@Override
Expand Down

0 comments on commit 82ae1c6

Please sign in to comment.