Skip to content

Commit

Permalink
Core: Add FileIO tracker/closer to REST catalog (apache#7487)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored May 16, 2023
1 parent 29eebdd commit d23b36c
Showing 1 changed file with 49 additions and 6 deletions.
55 changes: 49 additions & 6 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseSessionCatalog;
Expand All @@ -55,6 +56,7 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private final Function<Map<String, String>, RESTClient> clientBuilder;
private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<TableOperations, FileIO> fileIOCloser;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
private RESTClient client = null;
Expand All @@ -108,6 +111,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private FileIO io = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
private volatile ScheduledExecutorService refreshExecutor = null;
Expand Down Expand Up @@ -193,6 +197,12 @@ public void initialize(String name, Map<String, String> unresolved) {

this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.io);
this.closeables.addCloseable(this.client);
this.closeables.setSuppressCloseFailure(true);

this.snapshotMode =
SnapshotMode.valueOf(
PropertyUtil.propertyAsString(
Expand Down Expand Up @@ -319,6 +329,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
}
}

TableIdentifier finalIdentifier = loadedIdent;
AuthSession session = tableSession(response.config(), session(context));
TableMetadata tableMetadata;

Expand All @@ -329,7 +340,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
.setPreviousFileLocation(null)
.setSnapshotsSupplier(
() ->
loadInternal(context, identifier, SnapshotMode.ALL)
loadInternal(context, finalIdentifier, SnapshotMode.ALL)
.tableMetadata()
.snapshots())
.discardChanges()
Expand All @@ -341,23 +352,31 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(loadedIdent),
paths.table(finalIdentifier),
session::headers,
tableFileIO(context, response.config()),
tableMetadata);

trackFileIO(ops);

BaseTable table =
new BaseTable(
ops,
fullTableName(loadedIdent),
metricsReporter(paths.metrics(loadedIdent), session::headers));
fullTableName(finalIdentifier),
metricsReporter(paths.metrics(finalIdentifier), session::headers));
if (metadataType != null) {
return MetadataTableUtils.createMetadataTableInstance(table, metadataType);
}

return table;
}

private void trackFileIO(RESTTableOperations ops) {
if (io != ops.io()) {
fileIOCloser.put(ops, ops.io());
}
}

private MetricsReporter metricsReporter(
String metricsEndpoint, Supplier<Map<String, String>> headers) {
if (reportingViaRestEnabled) {
Expand Down Expand Up @@ -485,8 +504,13 @@ private ScheduledExecutorService tokenRefreshExecutor() {
public void close() throws IOException {
shutdownRefreshExecutor();

if (client != null) {
client.close();
if (closeables != null) {
closeables.close();
}

if (fileIOCloser != null) {
fileIOCloser.invalidateAll();
fileIOCloser.cleanUp();
}
}

Expand Down Expand Up @@ -592,6 +616,8 @@ public Table create() {
tableFileIO(context, response.config()),
response.tableMetadata());

trackFileIO(ops);

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers));
}
Expand All @@ -614,6 +640,8 @@ public Transaction createTransaction() {
createChanges(meta),
meta);

trackFileIO(ops);

return Transactions.createTableTransaction(
fullName, ops, meta, metricsReporter(paths.metrics(ident), session::headers));
}
Expand Down Expand Up @@ -665,6 +693,8 @@ public Transaction replaceTransaction() {
changes.build(),
base);

trackFileIO(ops);

return Transactions.replaceTableTransaction(
fullName, ops, replacement, metricsReporter(paths.metrics(ident), session::headers));
}
Expand Down Expand Up @@ -873,4 +903,17 @@ private static Cache<String, AuthSession> newSessionCache(Map<String, String> pr
(RemovalListener<String, AuthSession>) (id, auth, cause) -> auth.stopRefreshing())
.build();
}

private Cache<TableOperations, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}
}

0 comments on commit d23b36c

Please sign in to comment.