Skip to content

Commit

Permalink
Spark 3.x: Backport snapshot references metadata table test (apache#5806
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rajarshisarkar authored Sep 29, 2022
1 parent f161a13 commit f98115a
Showing 1 changed file with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,129 @@ public void testMetadataLogEntries() throws Exception {
metadataLogWithProjection);
}

@Test
public void testSnapshotReferencesMetatable() throws Exception {
// Create table and insert data
sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
spark
.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();

List<SimpleRecord> recordsB =
Lists.newArrayList(new SimpleRecord(1, "b"), new SimpleRecord(2, "b"));
spark
.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();

Table table = Spark3Util.loadIcebergTable(spark, tableName);
Long currentSnapshotId = table.currentSnapshot().snapshotId();

// Create branch
table
.manageSnapshots()
.createBranch("testBranch", currentSnapshotId)
.setMaxRefAgeMs("testBranch", 10)
.setMinSnapshotsToKeep("testBranch", 20)
.setMaxSnapshotAgeMs("testBranch", 30)
.commit();
// Create Tag
table
.manageSnapshots()
.createTag("testTag", currentSnapshotId)
.setMaxRefAgeMs("testTag", 50)
.commit();
// Check refs table
List<Row> references = spark.sql("SELECT * FROM " + tableName + ".refs").collectAsList();
Assert.assertEquals("Refs table should return 3 rows", 3, references.size());
List<Row> branches =
spark.sql("SELECT * FROM " + tableName + ".refs WHERE type='BRANCH'").collectAsList();
Assert.assertEquals("Refs table should return 2 branches", 2, branches.size());
List<Row> tags =
spark.sql("SELECT * FROM " + tableName + ".refs WHERE type='TAG'").collectAsList();
Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());

// Check branch entries in refs table
List<Row> mainBranch =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name = 'main' AND type='BRANCH'")
.collectAsList();
Assert.assertEquals("main", mainBranch.get(0).getAs("name"));
Assert.assertEquals("BRANCH", mainBranch.get(0).getAs("type"));
Assert.assertEquals(currentSnapshotId, mainBranch.get(0).getAs("snapshot_id"));

List<Row> testBranch =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testBranch' AND type='BRANCH'")
.collectAsList();
Assert.assertEquals("testBranch", testBranch.get(0).getAs("name"));
Assert.assertEquals("BRANCH", testBranch.get(0).getAs("type"));
Assert.assertEquals(currentSnapshotId, testBranch.get(0).getAs("snapshot_id"));
Assert.assertEquals(Long.valueOf(10), testBranch.get(0).getAs("max_reference_age_in_ms"));
Assert.assertEquals(Integer.valueOf(20), testBranch.get(0).getAs("min_snapshots_to_keep"));
Assert.assertEquals(Long.valueOf(30), testBranch.get(0).getAs("max_snapshot_age_in_ms"));

// Check tag entries in refs table
List<Row> testTag =
spark
.sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testTag' AND type='TAG'")
.collectAsList();
Assert.assertEquals("testTag", testTag.get(0).getAs("name"));
Assert.assertEquals("TAG", testTag.get(0).getAs("type"));
Assert.assertEquals(currentSnapshotId, testTag.get(0).getAs("snapshot_id"));
Assert.assertEquals(Long.valueOf(50), testTag.get(0).getAs("max_reference_age_in_ms"));

// Check projection in refs table
List<Row> testTagProjection =
spark
.sql(
"SELECT name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM "
+ tableName
+ ".refs where type='TAG'")
.collectAsList();
Assert.assertEquals("testTag", testTagProjection.get(0).getAs("name"));
Assert.assertEquals("TAG", testTagProjection.get(0).getAs("type"));
Assert.assertEquals(currentSnapshotId, testTagProjection.get(0).getAs("snapshot_id"));
Assert.assertEquals(
Long.valueOf(50), testTagProjection.get(0).getAs("max_reference_age_in_ms"));
Assert.assertNull(testTagProjection.get(0).getAs("min_snapshots_to_keep"));

List<Row> mainBranchProjection =
spark
.sql(
"SELECT name, type FROM "
+ tableName
+ ".refs WHERE name = 'main' AND type = 'BRANCH'")
.collectAsList();
Assert.assertEquals("main", mainBranchProjection.get(0).getAs("name"));
Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getAs("type"));

List<Row> testBranchProjection =
spark
.sql(
"SELECT type, name, max_reference_age_in_ms, snapshot_id FROM "
+ tableName
+ ".refs WHERE name = 'testBranch' AND type = 'BRANCH'")
.collectAsList();
Assert.assertEquals("testBranch", testBranchProjection.get(0).getAs("name"));
Assert.assertEquals("BRANCH", testBranchProjection.get(0).getAs("type"));
Assert.assertEquals(currentSnapshotId, testBranchProjection.get(0).getAs("snapshot_id"));
Assert.assertEquals(
Long.valueOf(10), testBranchProjection.get(0).getAs("max_reference_age_in_ms"));
}

/**
* Find matching manifest entries of an Iceberg table
*
Expand Down

0 comments on commit f98115a

Please sign in to comment.