Skip to content

Commit

Permalink
Core: Add REST support for lazy snapshot loading (apache#6850)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweeks authored Feb 20, 2023
1 parent 42d209f commit 237e991
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 5 deletions.
47 changes: 42 additions & 5 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
implements Configurable<Configuration>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
Expand All @@ -102,6 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private boolean refreshAuthByDefault = false;
private RESTClient client = null;
private ResourcePaths paths = null;
private SnapshotMode snapshotMode = null;
private Object conf = null;
private FileIO io = null;
private MetricsReporter reporter = null;
Expand All @@ -110,6 +113,15 @@ public class RESTSessionCatalog extends BaseSessionCatalog
// a lazy thread pool for token refresh
private volatile ScheduledExecutorService refreshExecutor = null;

enum SnapshotMode {
ALL,
REFS;

Map<String, String> params() {
return ImmutableMap.of("snapshots", this.name().toLowerCase(Locale.US));
}
}

public RESTSessionCatalog() {
this(config -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
}
Expand Down Expand Up @@ -179,6 +191,13 @@ public void initialize(String name, Map<String, String> unresolved) {
this.io =
CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);

this.snapshotMode =
SnapshotMode.valueOf(
PropertyUtil.propertyAsString(
mergedProps, REST_SNAPSHOT_LOADING_MODE, SnapshotMode.ALL.name())
.toUpperCase(Locale.US));

String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
this.reporter =
null != metricsReporterImpl
Expand Down Expand Up @@ -263,9 +282,11 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent
client.post(paths.rename(), request, null, headers(context), ErrorHandlers.tableErrorHandler());
}

private LoadTableResponse loadInternal(SessionContext context, TableIdentifier identifier) {
private LoadTableResponse loadInternal(
SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
return client.get(
paths.table(identifier),
mode.params(),
LoadTableResponse.class,
headers(context),
ErrorHandlers.tableErrorHandler());
Expand All @@ -279,7 +300,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
LoadTableResponse response;
TableIdentifier loadedIdent;
try {
response = loadInternal(context, identifier);
response = loadInternal(context, identifier, snapshotMode);
loadedIdent = identifier;
metadataType = null;

Expand All @@ -289,7 +310,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
// attempt to load a metadata table using the identifier's namespace as the base table
TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels());
try {
response = loadInternal(context, baseIdent);
response = loadInternal(context, baseIdent, snapshotMode);
loadedIdent = baseIdent;
} catch (NoSuchTableException ignored) {
// the base table does not exist
Expand All @@ -302,13 +323,29 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
}

AuthSession session = tableSession(response.config(), session(context));
TableMetadata tableMetadata;

if (snapshotMode == SnapshotMode.REFS) {
tableMetadata =
TableMetadata.buildFrom(response.tableMetadata())
.setSnapshotsSupplier(
() ->
loadInternal(context, identifier, SnapshotMode.ALL)
.tableMetadata()
.snapshots())
.discardChanges()
.build();
} else {
tableMetadata = response.tableMetadata();
}

RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(loadedIdent),
session::headers,
tableFileIO(response.config()),
response.tableMetadata());
tableMetadata);

TableIdentifier tableIdentifier = loadedIdent;
BaseTable table =
Expand Down Expand Up @@ -588,7 +625,7 @@ public Transaction createTransaction() {

@Override
public Transaction replaceTransaction() {
LoadTableResponse response = loadInternal(context, ident);
LoadTableResponse response = loadInternal(context, ident, snapshotMode);
String fullName = fullTableName(ident);

AuthSession session = tableSession(response.config(), session(context));
Expand Down
114 changes: 114 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -49,7 +54,9 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
import org.apache.iceberg.rest.auth.AuthSessionUtil;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
Expand Down Expand Up @@ -737,6 +744,113 @@ public void testTableCredential() {
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
}

@Test
public void testSnapshotParams() {
assertThat(SnapshotMode.ALL.params()).isEqualTo(ImmutableMap.of("snapshots", "all"));

assertThat(SnapshotMode.REFS.params()).isEqualTo(ImmutableMap.of("snapshots", "refs"));
}

@Test
public void testTableSnapshotLoading() {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));

RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.io.InMemoryFileIO",
// default loading to refs only
"snapshot-loading-mode",
"refs"));

// Create a table with multiple snapshots
Table table = catalog.createTable(TABLE, SCHEMA);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());

// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};

Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

Table refsTables = catalog.loadTable(TABLE);

assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that all snapshots are loaded when referenced
assertThat(refsTables.snapshots()).containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());
}

public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
Expand Down
13 changes: 13 additions & 0 deletions open-api/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,19 @@ paths:
table. The configuration key "token" is used to pass an access token to be used as a bearer token
for table requests. Otherwise, a token may be passed using a RFC 8693 token type as a configuration
key. For example, "urn:ietf:params:oauth:token-type:jwt=<JWT-token>".
parameters:
- in: query
name: snapshots
description:
The snapshots to return in the body of the metadata. Setting the value to `all` would
return the full set of snapshots currently valid for the table. Setting the value to
`refs` would load all snapshots referenced by branches or tags.

Default if no param is provided is `all`.
required: false
schema:
type: string
enum: [ all, refs ]
responses:
200:
$ref: '#/components/responses/LoadTableResponse'
Expand Down

0 comments on commit 237e991

Please sign in to comment.