Skip to content

Commit

Permalink
Core: Derive View operation from version (apache#8678)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Oct 20, 2023
1 parent 1e5fcbf commit 6f15175
Show file tree
Hide file tree
Showing 17 changed files with 84 additions and 93 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/org/apache/iceberg/view/ViewVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public interface ViewVersion {
* @return the string operation which produced the view version
*/
default String operation() {
return summary().get("operation");
return versionId() == 1 ? "create" : "replace";
}

/** The query output schema at version create time, without aliases */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -161,7 +162,7 @@ private View create(ViewOperations ops) {
.defaultNamespace(defaultNamespace)
.defaultCatalog(defaultCatalog)
.timestampMillis(System.currentTimeMillis())
.putSummary("operation", "create")
.putAllSummary(EnvironmentContext.get())
.build();

ViewMetadata viewMetadata =
Expand Down Expand Up @@ -206,7 +207,7 @@ private View replace(ViewOperations ops) {
.defaultNamespace(defaultNamespace)
.defaultCatalog(defaultCatalog)
.timestampMillis(System.currentTimeMillis())
.putSummary("operation", "replace")
.putAllSummary(EnvironmentContext.get())
.build();

ViewMetadata.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iceberg.view;

import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.immutables.value.Value;

/**
Expand All @@ -40,9 +39,7 @@ interface BaseViewVersion extends ViewVersion {
@Override
@Value.Lazy
default String operation() {
Preconditions.checkArgument(
summary().containsKey("operation"), "Invalid view version summary, missing operation");
return summary().get("operation");
return ViewVersion.super.operation();
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,15 @@ private int reuseOrCreateNewViewVersionId(ViewVersion viewVersion) {

/**
* Checks whether the given view versions would behave the same while ignoring the view version
* id, the creation timestamp, and the summary.
* id, the creation timestamp, and the operation.
*
* @param one the view version to compare
* @param two the view version to compare
* @return true if the given view versions would behave the same
*/
private boolean sameViewVersion(ViewVersion one, ViewVersion two) {
return Objects.equals(one.representations(), two.representations())
return Objects.equals(one.summary(), two.summary())
&& Objects.equals(one.representations(), two.representations())
&& Objects.equals(one.defaultCatalog(), two.defaultCatalog())
&& Objects.equals(one.defaultNamespace(), two.defaultNamespace())
&& one.schemaId() == two.schemaId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

import java.util.List;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -77,7 +78,7 @@ private ViewMetadata internalApply() {
.schemaId(schema.schemaId())
.defaultNamespace(defaultNamespace)
.defaultCatalog(defaultCatalog)
.putSummary("operation", "replace")
.putAllSummary(EnvironmentContext.get())
.addAllRepresentations(representations)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,12 +835,12 @@ public void testAddViewVersionFromJson() {
.versionId(23)
.timestampMillis(timestamp)
.schemaId(4)
.putSummary("operation", "replace")
.putSummary("user", "some-user")
.defaultNamespace(Namespace.of("ns"))
.build();
String json =
String.format(
"{\"action\":\"%s\",\"view-version\":{\"version-id\":23,\"timestamp-ms\":123456789,\"schema-id\":4,\"summary\":{\"operation\":\"replace\"},\"default-namespace\":[\"ns\"],\"representations\":[]}}",
"{\"action\":\"%s\",\"view-version\":{\"version-id\":23,\"timestamp-ms\":123456789,\"schema-id\":4,\"summary\":{\"user\":\"some-user\"},\"default-namespace\":[\"ns\"],\"representations\":[]}}",
action);
MetadataUpdate expected = new MetadataUpdate.AddViewVersion(viewVersion);
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
Expand All @@ -855,12 +855,12 @@ public void testAddViewVersionToJson() {
.versionId(23)
.timestampMillis(timestamp)
.schemaId(4)
.putSummary("operation", "replace")
.putSummary("user", "some-user")
.defaultNamespace(Namespace.of("ns"))
.build();
String expected =
String.format(
"{\"action\":\"%s\",\"view-version\":{\"version-id\":23,\"timestamp-ms\":123456789,\"schema-id\":4,\"summary\":{\"operation\":\"replace\"},\"default-namespace\":[\"ns\"],\"representations\":[]}}",
"{\"action\":\"%s\",\"view-version\":{\"version-id\":23,\"timestamp-ms\":123456789,\"schema-id\":4,\"summary\":{\"user\":\"some-user\"},\"default-namespace\":[\"ns\"],\"representations\":[]}}",
action);

MetadataUpdate update = new MetadataUpdate.AddViewVersion(viewVersion);
Expand Down
62 changes: 44 additions & 18 deletions core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private ViewVersion newViewVersion(int id, int schemaId, String sql) {
.timestampMillis(System.currentTimeMillis())
.defaultCatalog("prod")
.defaultNamespace(Namespace.of("default"))
.summary(ImmutableMap.of("operation", "create"))
.putSummary("user", "some-user")
.addRepresentations(
ImmutableSQLViewRepresentation.builder().dialect("spark").sql(sql).build())
.schemaId(schemaId)
Expand Down Expand Up @@ -127,7 +127,6 @@ public void unsupportedFormatVersion() {
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "op")
.defaultNamespace(Namespace.of("ns"))
.build())
.setCurrentVersionId(1)
Expand Down Expand Up @@ -165,7 +164,6 @@ public void emptySchemas() {
.schemaId(1)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "op")
.defaultNamespace(Namespace.of("ns"))
.build())
.setCurrentVersionId(1)
Expand All @@ -186,7 +184,6 @@ public void invalidCurrentVersionId() {
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "op")
.defaultNamespace(Namespace.of("ns"))
.build())
.setCurrentVersionId(23)
Expand All @@ -209,7 +206,6 @@ public void invalidCurrentSchemaId() {
.versionId(1)
.defaultNamespace(Namespace.of("ns"))
.timestampMillis(23L)
.putSummary("operation", "op")
.build())
.setCurrentVersionId(1)
.build())
Expand Down Expand Up @@ -436,7 +432,6 @@ public void uuidAssignment() {
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "create")
.defaultNamespace(Namespace.of("ns"))
.build())
.setCurrentVersionId(1)
Expand Down Expand Up @@ -472,7 +467,6 @@ public void viewMetadataWithMetadataLocation() {
.schemaId(schema.schemaId())
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "a")
.defaultNamespace(Namespace.of("ns"))
.build();

Expand Down Expand Up @@ -535,23 +529,15 @@ public void viewVersionIDReassignment() {
public void viewVersionDeduplication() {
// all view versions have the same ID
// additionally, there are duplicate view versions that only differ in their creation timestamp
// and/or the summary
ViewVersion viewVersionOne = newViewVersion(1, "select * from ns.tbl");
ViewVersion viewVersionTwo = newViewVersion(1, "select count(*) from ns.tbl");
ViewVersion viewVersionThree = newViewVersion(1, "select count(*) as count from ns.tbl");
ViewVersion viewVersionOneUpdated =
ImmutableViewVersion.builder().from(viewVersionOne).timestampMillis(1000).build();
ViewVersion viewVersionTwoUpdated =
ImmutableViewVersion.builder()
.from(viewVersionTwo)
.summary(ImmutableMap.of("operation", "replace"))
.build();
ImmutableViewVersion.builder().from(viewVersionTwo).timestampMillis(100).build();
ViewVersion viewVersionThreeUpdated =
ImmutableViewVersion.builder()
.from(viewVersionThree)
.timestampMillis(1000)
.summary(ImmutableMap.of("operation", "replace"))
.build();
ImmutableViewVersion.builder().from(viewVersionThree).timestampMillis(10).build();

ViewMetadata viewMetadata =
ViewMetadata.builder()
Expand All @@ -578,6 +564,47 @@ public void viewVersionDeduplication() {
ImmutableViewVersion.builder().from(viewVersionThree).versionId(3).build());
}

@Test
public void viewVersionDeduplicationWithCustomSummary() {
// all view versions have the same ID
// additionally, there are duplicate view versions that only differ in the summary
ViewVersion viewVersionOne = newViewVersion(1, "select * from ns.tbl");
ViewVersion viewVersionTwo = newViewVersion(1, "select count(*) from ns.tbl");
ViewVersion viewVersionOneUpdated =
ImmutableViewVersion.builder()
.from(viewVersionOne)
.timestampMillis(1000)
.summary(ImmutableMap.of("user", "some-user"))
.build();
ViewVersion viewVersionTwoUpdated =
ImmutableViewVersion.builder()
.from(viewVersionTwo)
.summary(ImmutableMap.of("user", "some-user", "key", "val"))
.build();

ViewMetadata viewMetadata =
ViewMetadata.builder()
.setLocation("custom-location")
.addSchema(new Schema(Types.NestedField.required(1, "x", Types.LongType.get())))
.addVersion(viewVersionOne)
.addVersion(viewVersionTwo)
.addVersion(viewVersionOneUpdated)
.addVersion(viewVersionTwoUpdated)
.setCurrentVersionId(3)
.build();

assertThat(viewMetadata.currentVersion())
.isEqualTo(ImmutableViewVersion.builder().from(viewVersionTwoUpdated).versionId(3).build());

// IDs of the view versions should be re-assigned and view versions should be de-duplicated
assertThat(viewMetadata.versions())
.hasSize(3)
.containsExactly(
viewVersionOne,
ImmutableViewVersion.builder().from(viewVersionTwo).versionId(2).build(),
ImmutableViewVersion.builder().from(viewVersionTwoUpdated).versionId(3).build());
}

@Test
public void schemaIDReassignment() {
Schema schemaOne = new Schema(5, Types.NestedField.required(1, "x", Types.LongType.get()));
Expand Down Expand Up @@ -740,7 +767,6 @@ public void viewMetadataWithMultipleSQLForSameDialect() {
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "create")
.defaultNamespace(Namespace.of("ns"))
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void readAndWriteValidViewMetadata() throws Exception {
ImmutableViewVersion.builder()
.versionId(1)
.timestampMillis(4353L)
.summary(ImmutableMap.of("operation", "create"))
.summary(ImmutableMap.of("user", "some-user"))
.schemaId(0)
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
Expand All @@ -89,7 +89,7 @@ public void readAndWriteValidViewMetadata() throws Exception {
.versionId(2)
.schemaId(0)
.timestampMillis(5555L)
.summary(ImmutableMap.of("operation", "replace"))
.summary(ImmutableMap.of("user", "some-user"))
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
.addRepresentations(
Expand Down Expand Up @@ -182,7 +182,7 @@ public void viewMetadataWithMetadataLocation() throws Exception {
ImmutableViewVersion.builder()
.versionId(1)
.timestampMillis(4353L)
.summary(ImmutableMap.of("operation", "create"))
.summary(ImmutableMap.of("user", "some-user"))
.schemaId(0)
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
Expand All @@ -198,7 +198,7 @@ public void viewMetadataWithMetadataLocation() throws Exception {
.versionId(2)
.schemaId(0)
.timestampMillis(5555L)
.summary(ImmutableMap.of("operation", "replace"))
.summary(ImmutableMap.of("user", "some-user"))
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
.addRepresentations(
Expand Down Expand Up @@ -247,7 +247,7 @@ public void viewMetadataWithMultipleSQLsForDialectShouldBeReadable() throws Exce
ImmutableViewVersion.builder()
.versionId(1)
.timestampMillis(4353L)
.summary(ImmutableMap.of("operation", "create"))
.summary(ImmutableMap.of("user", "some-user"))
.schemaId(0)
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
Expand Down Expand Up @@ -306,7 +306,7 @@ public void replaceViewMetadataWithMultipleSQLsForDialect() throws Exception {
.versionId(2)
.schemaId(0)
.timestampMillis(5555L)
.summary(ImmutableMap.of("operation", "replace"))
.summary(ImmutableMap.of("user", "some-user"))
.defaultCatalog("some-catalog")
.defaultNamespace(Namespace.empty())
.addRepresentations(
Expand Down Expand Up @@ -335,7 +335,7 @@ public void metadataCompression(String fileName) throws IOException {
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "create")
.putSummary("user", "some-user")
.defaultNamespace(Namespace.of("ns"))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testParseViewVersion() {
.timestampMillis(12345)
.defaultNamespace(Namespace.of("one", "two"))
.addRepresentations(firstRepresentation, secondRepresentation)
.summary(ImmutableMap.of("operation", "create", "user", "some-user"))
.summary(ImmutableMap.of("user", "some-user"))
.schemaId(1)
.build();

Expand All @@ -55,7 +55,7 @@ public void testParseViewVersion() {

String serializedViewVersion =
String.format(
"{\"version-id\":1, \"timestamp-ms\":12345, \"schema-id\":1, \"summary\":{\"operation\":\"create\", \"user\":\"some-user\"}, \"representations\":%s, \"default-namespace\":[\"one\",\"two\"]}",
"{\"version-id\":1, \"timestamp-ms\":12345, \"schema-id\":1, \"summary\":{\"user\":\"some-user\"}, \"representations\":%s, \"default-namespace\":[\"one\",\"two\"]}",
serializedRepresentations);

Assertions.assertThat(ViewVersionParser.fromJson(serializedViewVersion))
Expand All @@ -81,7 +81,7 @@ public void testSerializeViewVersion() {
.versionId(1)
.timestampMillis(12345)
.addRepresentations(firstRepresentation, secondRepresentation)
.summary(ImmutableMap.of("operation", "create", "user", "some-user"))
.summary(ImmutableMap.of("user", "some-user"))
.defaultNamespace(Namespace.of("one", "two"))
.defaultCatalog("catalog")
.schemaId(1)
Expand All @@ -93,7 +93,7 @@ public void testSerializeViewVersion() {

String expectedViewVersion =
String.format(
"{\"version-id\":1,\"timestamp-ms\":12345,\"schema-id\":1,\"summary\":{\"operation\":\"create\",\"user\":\"some-user\"},"
"{\"version-id\":1,\"timestamp-ms\":12345,\"schema-id\":1,\"summary\":{\"user\":\"some-user\"},"
+ "\"default-catalog\":\"catalog\",\"default-namespace\":[\"one\",\"two\"],\"representations\":%s}",
expectedRepresentations);

Expand All @@ -102,37 +102,6 @@ public void testSerializeViewVersion() {
.isEqualTo(expectedViewVersion);
}

@Test
public void testFailParsingMissingOperation() {
String serializedRepresentations =
"[{\"type\":\"sql\",\"sql\":\"select * from foo\",\"dialect\":\"spark-sql\"},"
+ "{\"type\":\"sql\",\"sql\":\"select a, b, c from foo\",\"dialect\":\"some-sql\"}]";

String viewVersionMissingOperation =
String.format(
"{\"version-id\":1,\"timestamp-ms\":12345,\"summary\":{\"some-other-field\":\"some-other-value\"},\"representations\":%s,\"schema-id\":1,\"default-namespace\":[\"one\",\"two\"]}",
serializedRepresentations);

// parsing a view version with an invalid operation shouldn't fail
ViewVersion viewVersion = ViewVersionParser.fromJson(viewVersionMissingOperation);
Assertions.assertThatThrownBy(() -> viewVersion.operation())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid view version summary, missing operation");

Assertions.assertThatThrownBy(
() ->
ImmutableViewVersion.builder()
.versionId(1)
.timestampMillis(12345)
.schemaId(1)
.defaultNamespace(Namespace.of("one", "two"))
.summary(ImmutableMap.of("user", "some-user"))
.build()
.operation())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid view version summary, missing operation");
}

@Test
public void testNullViewVersion() {
Assertions.assertThatThrownBy(() -> ViewVersionParser.toJson(null))
Expand Down
Loading

0 comments on commit 6f15175

Please sign in to comment.