Skip to content

Commit

Permalink
Spark-3.3: Backport 'WAP branch not propagated when using DELETE with…
Browse files Browse the repository at this point in the history
…out WHERE' (apache#8033)
  • Loading branch information
rakesh-das08 authored Jul 11, 2023
1 parent f94de0f commit 62cb7b5
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -1135,6 +1137,42 @@ public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableE
branch)));
}

@Test
public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableException {
assumeThat(branch)
.as("Run only if custom WAP branch is not main")
.isNotNull()
.isNotEqualTo(SnapshotRef.MAIN_BRANCH);

createAndInitPartitionedTable();
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
createBranchIfNeeded();

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
() -> {
sql("DELETE FROM %s t WHERE id=1", tableName);
Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L);
Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(2L);
Assertions.assertThat(spark.table(tableName + ".branch_main").count())
.as("Should not modify main branch")
.isEqualTo(3L);
});
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
() -> {
sql("DELETE FROM %s t", tableName);
Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L);
Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(0L);
Assertions.assertThat(spark.table(tableName + ".branch_main").count())
.as("Should not modify main branch")
.isEqualTo(3L);
});
}

// TODO: multiple stripes for ORC

protected void createAndInitPartitionedTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
Expand Down Expand Up @@ -715,6 +716,43 @@ public static Dataset<Row> loadMetadataTable(
spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
}

/**
* Determine the write branch.
*
* <p>Validate wap config and determine the write branch.
*
* @param spark a Spark Session
* @param branch write branch if there is no WAP branch configured
* @return branch for write operation
*/
public static String determineWriteBranch(SparkSession spark, String branch) {
String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
ValidationException.check(
wapId == null || wapBranch == null,
"Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
wapId,
wapBranch);

if (wapBranch != null) {
ValidationException.check(
branch == null,
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
branch,
wapBranch);

return wapBranch;
}
return branch;
}

public static boolean wapEnabled(Table table) {
return PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
Boolean.getBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
}

/** Class representing a table partition. */
public static class SparkPartition implements Serializable {
private final Map<String, String> values;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -372,6 +373,10 @@ public void deleteWhere(Filter[] filters) {
.set("spark.app.id", sparkSession().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr);

if (SparkTableUtil.wapEnabled(table())) {
branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch);
}

if (branch != null) {
deleteFiles.toBranch(branch);
}
Expand Down

0 comments on commit 62cb7b5

Please sign in to comment.