Skip to content

Commit

Permalink
Spark 3.4: Create non-existing Tag/Branch when using CREATE OR REPLACE (
Browse files Browse the repository at this point in the history
apache#8086)

Currently, executing `ALTER TABLE x CREATE OR REPLACE TAG xyz` will fail
with `Tag does not exist: xyz`.

As a user I'd expect this to create the tag due to the `CREATE OR
REPLACE` usage. The same issue happens with branches.
  • Loading branch information
nastra authored Jul 20, 2023
1 parent 7da759b commit 9204b2d
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
val branchRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.refRetain()))
val branchRefAgeMs = branchRetention.map(retain =>
TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong))
val create = createOrReplaceBranchClause.CREATE() != null
val replace = ctx.createReplaceBranchClause().REPLACE() != null
val ifNotExists = createOrReplaceBranchClause.EXISTS() != null

Expand All @@ -130,6 +131,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
typedVisit[Seq[String]](ctx.multipartIdentifier),
branchName.getText,
branchOptions,
create,
replace,
ifNotExists)
}
Expand All @@ -153,12 +155,14 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
tagRefAgeMs
)

val create = createTagClause.CREATE() != null
val replace = createTagClause.REPLACE() != null
val ifNotExists = createTagClause.EXISTS() != null

CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
create,
replace,
ifNotExists)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class CreateOrReplaceBranch(
table: Seq[String],
branch: String,
branchOptions: BranchOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends LeafCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class CreateOrReplaceTag(
table: Seq[String],
tag: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends LeafCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case class CreateOrReplaceBranchExec(
ident: Identifier,
branch: String,
branchOptions: BranchOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends LeafV2CommandExec {

Expand All @@ -51,15 +52,18 @@ case class CreateOrReplaceBranchExec(
"Cannot complete create or replace branch operation on %s, main has no snapshot", ident)

val manageSnapshots = iceberg.table().manageSnapshots()
if (!replace) {
val ref = iceberg.table().refs().get(branch)
if (ref != null && ifNotExists) {
val refExists = null != iceberg.table().refs().get(branch)

if (create && replace && !refExists) {
manageSnapshots.createBranch(branch, snapshotId)
} else if (replace) {
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}

manageSnapshots.createBranch(branch, snapshotId)
} else {
manageSnapshots.replaceBranch(branch, snapshotId)
}

if (branchOptions.numSnapshots.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ case class CreateOrReplaceTagExec(
ident: Identifier,
tag: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends LeafV2CommandExec {

Expand All @@ -50,15 +51,18 @@ case class CreateOrReplaceTagExec(
"Cannot complete create or replace tag operation on %s, main has no snapshot", ident)

val manageSnapshot = iceberg.table.manageSnapshots()
if (!replace) {
val ref = iceberg.table().refs().get(tag)
if (ref != null && ifNotExists) {
val refExists = null != iceberg.table().refs().get(tag)

if (create && replace && !refExists) {
manageSnapshot.createTag(tag, snapshotId)
} else if (replace) {
manageSnapshot.replaceTag(tag, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}

manageSnapshot.createTag(tag, snapshotId)
} else {
manageSnapshot.replaceTag(tag, snapshotId)
}

if (tagOptions.snapshotRefRetain.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
AddPartitionFieldExec(catalog, ident, transform, name) :: Nil

case CreateOrReplaceBranch(
IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) =>
CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil
IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, create, replace, ifNotExists) =>
CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, create, replace, ifNotExists) :: Nil

case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) =>
CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil
case CreateOrReplaceTag(
IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create, replace, ifNotExists) =>
CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil

case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
DropBranchExec(catalog, ident, branch, ifExists) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -289,4 +291,69 @@ private Table insertRows() throws NoSuchTableException {
df.writeTo(tableName).append();
return validationCatalog.loadTable(tableIdent);
}

@Test
public void createOrReplace() throws NoSuchTableException {
Table table = insertRows();
long first = table.currentSnapshot().snapshotId();
String branchName = "b1";
insertRows();
long second = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch(branchName, second).commit();

sql(
"ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d",
tableName, branchName, first);
table.refresh();
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}

@Test
public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException {
Table table = insertRows();
String branchName = "b1";
insertRows();
long snapshotId = table.currentSnapshot().snapshotId();

sql(
"ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d",
tableName, branchName, snapshotId);
table.refresh();
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(snapshotId);
}

@Test
public void replaceBranch() throws NoSuchTableException {
Table table = insertRows();
long first = table.currentSnapshot().snapshotId();
String branchName = "b1";
long expectedMaxRefAgeMs = 1000;
table
.manageSnapshots()
.createBranch(branchName, first)
.setMaxRefAgeMs(branchName, expectedMaxRefAgeMs)
.commit();

insertRows();
long second = table.currentSnapshot().snapshotId();

sql("ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", tableName, branchName, second);
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
assertThat(ref.snapshotId()).isEqualTo(second);
assertThat(ref.maxRefAgeMs()).isEqualTo(expectedMaxRefAgeMs);
}

@Test
public void replaceBranchDoesNotExist() throws NoSuchTableException {
Table table = insertRows();

Assertions.assertThatThrownBy(
() ->
sql(
"ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d",
tableName, "someBranch", table.currentSnapshot().snapshotId()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Branch does not exist: someBranch");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -364,6 +366,18 @@ public void testDropTagIfExists() throws NoSuchTableException {
Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName));
}

@Test
public void createOrReplaceWithNonExistingTag() throws NoSuchTableException {
Table table = insertRows();
String tagName = "t1";
insertRows();
long snapshotId = table.currentSnapshot().snapshotId();

sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId);
table.refresh();
assertThat(table.refs().get(tagName).snapshotId()).isEqualTo(snapshotId);
}

private Table insertRows() throws NoSuchTableException {
List<SimpleRecord> records =
ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
Expand Down

0 comments on commit 9204b2d

Please sign in to comment.