Skip to content

Commit

Permalink
Bump Nessie to 0.28.0 and adopt test code (apache#4594)
Browse files Browse the repository at this point in the history
Enhances Iceberg commit conflict detection by maintaining the commit-id from which a
table metadata has been loaded, to use it as the "expected hash" in a Nessie commit.

Makes Nessie specific properties available in `TableMetadata` properties:
* `nessie.content.id` - the Nessie `Content.getId()`
* `nessie.commit.id` - the commit ID used to retrieve the table metadata
* `nessie.reference.name` - the reference name from which the table metadata has been loaded

Also fixes an issue via `org.apache.iceberg.nessie.NessieTableOperations#loadTableMetadata`
that caused too many "previous files", because the `TableMetadata.buildFrom()` assumed that
it's only called for ongoing modifications.
  • Loading branch information
snazy authored May 4, 2022
1 parent 30b31a2 commit eb1fd41
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 178 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ project(':iceberg-nessie') {
implementation project(':iceberg-core')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation "org.projectnessie:nessie-client"
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"

testImplementation "org.projectnessie:nessie-jaxrs-testextension"
// Need to "pull in" el-api explicitly :(
Expand All @@ -598,6 +600,10 @@ project(':iceberg-nessie') {

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')

// Only there to prevent "warning: unknown enum constant SchemaType.OBJECT" compile messages
compileOnly "org.eclipse.microprofile.openapi:microprofile-openapi-api:3.0"
testCompileOnly "org.eclipse.microprofile.openapi:microprofile-openapi-api:3.0"
}
}

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ public static class Builder {

// handled in build
private final List<HistoryEntry> snapshotLog;
private final String previousFileLocation;
private String previousFileLocation;
private final List<MetadataLogEntry> previousFiles;

// indexes for convenience
Expand Down Expand Up @@ -1153,6 +1153,11 @@ public Builder discardChanges() {
return this;
}

public Builder setPreviousFileLocation(String previousFileLocation) {
this.previousFileLocation = previousFileLocation;
return this;
}

public TableMetadata build() {
if (changes.size() == startingChangeCount && !(discardChanges && changes.size() > 0)) {
return base;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,29 @@

package org.apache.iceberg.nessie;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.GenericMetadata;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
import org.projectnessie.model.ImmutableIcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +52,12 @@ public class NessieTableOperations extends BaseMetastoreTableOperations {

private static final Logger LOG = LoggerFactory.getLogger(NessieTableOperations.class);

/**
* Name of the `{@link TableMetadata} property that holds the Nessie commit-ID from which the
* metadata has been loaded.
*/
public static final String NESSIE_COMMIT_ID_PROPERTY = "nessie.commit.id";

private final NessieIcebergClient client;
private final ContentKey key;
private IcebergTable table;
Expand All @@ -75,21 +83,23 @@ protected String tableName() {
return key.toString();
}

@Override
protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
super.refreshFromMetadataLocation(newLocation, shouldRetry, numRetries, this::loadTableMetadata);
}

private TableMetadata loadTableMetadata(String metadataLocation) {
private TableMetadata loadTableMetadata(String metadataLocation, Reference reference) {
// Update the TableMetadata with the Content of NessieTableState.
TableMetadata.Builder builder = TableMetadata.buildFrom(TableMetadataParser.read(io(), metadataLocation))
TableMetadata deserialized = NessieUtil.tableMetadataFromIcebergTable(io(), table, metadataLocation);
Map<String, String> newProperties = Maps.newHashMap(deserialized.properties());
newProperties.put(NESSIE_COMMIT_ID_PROPERTY, reference.getHash());
TableMetadata.Builder builder = TableMetadata.buildFrom(deserialized)
.setPreviousFileLocation(null)
.setCurrentSchema(table.getSchemaId())
.setDefaultSortOrder(table.getSortOrderId())
.setDefaultPartitionSpec(table.getSpecId())
.withMetadataLocation(metadataLocation);
.withMetadataLocation(metadataLocation)
.setProperties(newProperties);
if (table.getSnapshotId() != -1) {
builder.setBranchSnapshot(table.getSnapshotId(), SnapshotRef.MAIN_BRANCH);
}
LOG.info("loadTableMetadata for '{}' from location '{}' at '{}'", key, metadataLocation,
reference);

return builder.discardChanges().build();
}
Expand All @@ -103,31 +113,44 @@ protected void doRefresh() {
"is no longer valid.", client.getRef().getName()), e);
}
String metadataLocation = null;
Reference reference = client.getRef().getReference();
try {
Content content = client.getApi().getContent().key(key).reference(client.getRef().getReference()).get()
Content content = client.getApi().getContent().key(key).reference(reference).get()
.get(key);
LOG.debug("Content '{}' at '{}': {}", key, client.getRef().getReference(), content);
LOG.debug("Content '{}' at '{}': {}", key, reference, content);
if (content == null) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException("No such table '%s' in '%s'", key, client.getRef().getReference());
throw new NoSuchTableException("No such table '%s' in '%s'", key, reference);
}
} else {
this.table = content.unwrap(IcebergTable.class)
.orElseThrow(() -> new IllegalStateException(String.format("Cannot refresh iceberg table: " +
"Nessie points to a non-Iceberg object for path: %s.", key)));
.orElseThrow(
() -> new IllegalStateException(String.format("Cannot refresh iceberg table: " +
"Nessie points to a non-Iceberg object for path: %s.", key)));
metadataLocation = table.getMetadataLocation();
}
} catch (NessieNotFoundException ex) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException(ex, "No such table '%s'", key);
}
}
refreshFromMetadataLocation(metadataLocation, 2);
refreshFromMetadataLocation(metadataLocation, null, 2, l -> loadTableMetadata(l, reference));
}

@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
client.getRef().checkMutable();
UpdateableReference updateableReference = client.getRef();

updateableReference.checkMutable();

Branch current = updateableReference.getAsBranch();
Branch expectedHead = current;
if (base != null) {
String metadataCommitId = base.property(NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash());
if (metadataCommitId != null) {
expectedHead = Branch.of(expectedHead.getName(), metadataCommitId);
}
}

String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);

Expand All @@ -139,15 +162,20 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
Snapshot snapshot = metadata.currentSnapshot();
long snapshotId = snapshot != null ? snapshot.snapshotId() : -1L;

JsonNode newMetadata = NessieUtil.tableMetadataAsJsonNode(metadata);
IcebergTable newTable = newTableBuilder
.snapshotId(snapshotId)
.schemaId(metadata.currentSchemaId())
.specId(metadata.defaultSpecId())
.sortOrderId(metadata.defaultSortOrderId())
.metadataLocation(newMetadataLocation)
.metadata(
GenericMetadata.of("org.apache:iceberg:" + metadata.formatVersion(), newMetadata))
.build();

LOG.debug("Committing '{}' against '{}': {}", key, client.getRef().getReference(), newTable);
LOG.debug("Committing '{}' against '{}', current is '{}': {}", key, expectedHead,
current.getHash(), newTable);
ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder();
builder.message(buildCommitMsg(base, metadata));
if (isSnapshotOperation(base, metadata)) {
Expand All @@ -156,14 +184,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
Branch branch = client.getApi().commitMultipleOperations()
.operation(Operation.Put.of(key, newTable, table))
.commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build())
.branch(client.getRef().getAsBranch())
.branch(expectedHead)
.commit();
client.getRef().updateReference(branch);
LOG.info("Committed '{}' against '{}', expected commit-id was '{}'", key, branch,
expectedHead.getHash());
updateableReference.updateReference(branch);

delete = false;
} catch (NessieConflictException ex) {
throw new CommitFailedException(ex, "Cannot commit: Reference hash is out of date. " +
"Update the reference '%s' and try again", client.getRef().getName());
"Update the reference '%s' and try again", updateableReference.getName());
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
Expand All @@ -173,7 +203,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new CommitStateUnknownException(ex);
} catch (NessieNotFoundException ex) {
throw new RuntimeException(
String.format("Cannot commit: Reference '%s' no longer exists", client.getRef().getName()), ex);
String.format("Cannot commit: Reference '%s' no longer exists",
updateableReference.getName()), ex);
} finally {
if (delete) {
io().deleteFile(newMetadataLocation);
Expand All @@ -189,7 +220,8 @@ private boolean isSnapshotOperation(TableMetadata base, TableMetadata metadata)

private String buildCommitMsg(TableMetadata base, TableMetadata metadata) {
if (isSnapshotOperation(base, metadata)) {
return String.format("Iceberg %s against %s", metadata.currentSnapshot().operation(), tableName());
return String.format("Iceberg %s against %s", metadata.currentSnapshot().operation(),
tableName());
} else if (base != null && metadata.currentSchemaId() != base.currentSchemaId()) {
return String.format("Iceberg schema change against %s", tableName());
}
Expand Down
42 changes: 42 additions & 0 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,28 @@

package org.apache.iceberg.nessie;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;

public final class NessieUtil {
Expand Down Expand Up @@ -89,4 +99,36 @@ private static String commitAuthor(Map<String, String> catalogOptions) {
return Optional.ofNullable(catalogOptions.get(CatalogProperties.USER))
.orElseGet(() -> System.getProperty("user.name"));
}

static TableMetadata tableMetadataFromIcebergTable(FileIO io, IcebergTable table, String metadataLocation) {
TableMetadata deserialized;
if (table.getMetadata() != null) {
String jsonString;
try (StringWriter writer = new StringWriter()) {
try (JsonGenerator generator = JsonUtil.factory().createGenerator(writer)) {
generator.writeObject(table.getMetadata().getMetadata());
}
jsonString = writer.toString();
} catch (IOException e) {
throw new RuntimeException("Failed to generate JSON string from metadata", e);
}
deserialized = TableMetadataParser.fromJson(io, metadataLocation, jsonString);
} else {
deserialized = TableMetadataParser.read(io, metadataLocation);
}
return deserialized;
}

static JsonNode tableMetadataAsJsonNode(TableMetadata metadata) {
JsonNode newMetadata;
try {
String jsonString = TableMetadataParser.toJson(metadata);
try (JsonParser parser = JsonUtil.factory().createParser(jsonString)) {
newMetadata = parser.readValueAs(JsonNode.class);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return newMetadata;
}
}
73 changes: 0 additions & 73 deletions nessie/src/test/java/org/apache/iceberg/nessie/NessieUtilTest.java

This file was deleted.

Loading

0 comments on commit eb1fd41

Please sign in to comment.