Skip to content

Commit

Permalink
Nessie: Refactor NessieTableOperations#doCommit (apache#6240)
Browse files Browse the repository at this point in the history
* Nessie: Refactor NessieTableOperations

* Expose refName
  • Loading branch information
ajantha-bhat authored Nov 22, 2022
1 parent b156fff commit a59599d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -52,6 +54,8 @@
import org.projectnessie.model.EntriesResponse;
import org.projectnessie.model.GetNamespacesResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
import org.projectnessie.model.ImmutableIcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.projectnessie.model.Tag;
Expand Down Expand Up @@ -416,6 +420,91 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
return !threw;
}

public void commitTable(
TableMetadata base,
TableMetadata metadata,
String newMetadataLocation,
IcebergTable expectedContent,
ContentKey key)
throws NessieConflictException, NessieNotFoundException {
UpdateableReference updateableReference = getRef();

updateableReference.checkMutable();

Branch current = updateableReference.getAsBranch();
Branch expectedHead = current;
if (base != null) {
String metadataCommitId =
base.property(NessieTableOperations.NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash());
if (metadataCommitId != null) {
expectedHead = Branch.of(expectedHead.getName(), metadataCommitId);
}
}

ImmutableIcebergTable.Builder newTableBuilder = ImmutableIcebergTable.builder();
if (expectedContent != null) {
newTableBuilder.id(expectedContent.getId());
}
Snapshot snapshot = metadata.currentSnapshot();
long snapshotId = snapshot != null ? snapshot.snapshotId() : -1L;

IcebergTable newTable =
newTableBuilder
.snapshotId(snapshotId)
.schemaId(metadata.currentSchemaId())
.specId(metadata.defaultSpecId())
.sortOrderId(metadata.defaultSortOrderId())
.metadataLocation(newMetadataLocation)
.build();

LOG.debug(
"Committing '{}' against '{}', current is '{}': {}",
key,
expectedHead,
current.getHash(),
newTable);
ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder();
builder.message(buildCommitMsg(base, metadata, key.toString()));
if (isSnapshotOperation(base, metadata)) {
builder.putProperties("iceberg.operation", snapshot.operation());
}
Branch branch =
getApi()
.commitMultipleOperations()
.operation(Operation.Put.of(key, newTable, expectedContent))
.commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build())
.branch(expectedHead)
.commit();
LOG.info(
"Committed '{}' against '{}', expected commit-id was '{}'",
key,
branch,
expectedHead.getHash());
updateableReference.updateReference(branch);
}

private boolean isSnapshotOperation(TableMetadata base, TableMetadata metadata) {
Snapshot snapshot = metadata.currentSnapshot();
return snapshot != null
&& (base == null
|| base.currentSnapshot() == null
|| snapshot.snapshotId() != base.currentSnapshot().snapshotId());
}

private String buildCommitMsg(TableMetadata base, TableMetadata metadata, String tableName) {
if (isSnapshotOperation(base, metadata)) {
return String.format(
"Iceberg %s against %s", metadata.currentSnapshot().operation(), tableName);
} else if (base != null && metadata.currentSchemaId() != base.currentSchemaId()) {
return String.format("Iceberg schema change against %s", tableName);
}
return String.format("Iceberg commit against %s", tableName);
}

public String refName() {
return getRef().getName();
}

@Override
public void close() {
if (null != api) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand All @@ -33,13 +32,9 @@
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
import org.projectnessie.model.ImmutableIcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,75 +143,22 @@ protected void doRefresh() {

@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
UpdateableReference updateableReference = client.getRef();

updateableReference.checkMutable();

Branch current = updateableReference.getAsBranch();
Branch expectedHead = current;
if (base != null) {
String metadataCommitId = base.property(NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash());
if (metadataCommitId != null) {
expectedHead = Branch.of(expectedHead.getName(), metadataCommitId);
}
}

String newMetadataLocation =
(base == null) && (metadata.metadataFileLocation() != null)
? metadata.metadataFileLocation()
: writeNewMetadata(metadata, currentVersion() + 1);

String refName = client.refName();
boolean delete = true;
try {
ImmutableIcebergTable.Builder newTableBuilder = ImmutableIcebergTable.builder();
if (table != null) {
newTableBuilder.id(table.getId());
}
Snapshot snapshot = metadata.currentSnapshot();
long snapshotId = snapshot != null ? snapshot.snapshotId() : -1L;

IcebergTable newTable =
newTableBuilder
.snapshotId(snapshotId)
.schemaId(metadata.currentSchemaId())
.specId(metadata.defaultSpecId())
.sortOrderId(metadata.defaultSortOrderId())
.metadataLocation(newMetadataLocation)
.build();

LOG.debug(
"Committing '{}' against '{}', current is '{}': {}",
key,
expectedHead,
current.getHash(),
newTable);
ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder();
builder.message(buildCommitMsg(base, metadata));
if (isSnapshotOperation(base, metadata)) {
builder.putProperties("iceberg.operation", snapshot.operation());
}
Branch branch =
client
.getApi()
.commitMultipleOperations()
.operation(Operation.Put.of(key, newTable, table))
.commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build())
.branch(expectedHead)
.commit();
LOG.info(
"Committed '{}' against '{}', expected commit-id was '{}'",
key,
branch,
expectedHead.getHash());
updateableReference.updateReference(branch);

client.commitTable(base, metadata, newMetadataLocation, table, key);
delete = false;
} catch (NessieConflictException ex) {
throw new CommitFailedException(
ex,
"Cannot commit: Reference hash is out of date. "
+ "Update the reference '%s' and try again",
updateableReference.getName());
refName);
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
Expand All @@ -226,34 +168,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new CommitStateUnknownException(ex);
} catch (NessieNotFoundException ex) {
throw new RuntimeException(
String.format(
"Cannot commit: Reference '%s' no longer exists", updateableReference.getName()),
ex);
String.format("Cannot commit: Reference '%s' no longer exists", refName), ex);
} finally {
if (delete) {
io().deleteFile(newMetadataLocation);
}
}
}

private boolean isSnapshotOperation(TableMetadata base, TableMetadata metadata) {
Snapshot snapshot = metadata.currentSnapshot();
return snapshot != null
&& (base == null
|| base.currentSnapshot() == null
|| snapshot.snapshotId() != base.currentSnapshot().snapshotId());
}

private String buildCommitMsg(TableMetadata base, TableMetadata metadata) {
if (isSnapshotOperation(base, metadata)) {
return String.format(
"Iceberg %s against %s", metadata.currentSnapshot().operation(), tableName());
} else if (base != null && metadata.currentSchemaId() != base.currentSchemaId()) {
return String.format("Iceberg schema change against %s", tableName());
}
return String.format("Iceberg commit against %s", tableName());
}

@Override
public FileIO io() {
return fileIO;
Expand Down

0 comments on commit a59599d

Please sign in to comment.