Skip to content

Commit

Permalink
Spark 3.2, 3.3: Fix nullability propagation for MergeRows node (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
singhpk234 authored Sep 27, 2022
1 parent ecf004b commit 3bc2f97
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
}.toMap

attrs.zipWithIndex.map { case (attr, index) =>
attr.withNullability(nullabilityMap(index))
AttributeReference(attr.name, attr.dataType, nullabilityMap(index), attr.metadata)()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Test;

public class TestCopyOnWriteMerge extends TestMerge {

Expand All @@ -38,4 +40,37 @@ public TestCopyOnWriteMerge(
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.MERGE_MODE, "copy-on-write");
}

@Test
public void testMergeWithTableWithNonNullableColumn() {
createAndInitTable(
"id INT NOT NULL, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");

createOrReplaceView(
"source",
"id INT NOT NULL, dep STRING",
"{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.id "
+ "WHEN MATCHED AND t.id = 1 THEN "
+ " UPDATE SET * "
+ "WHEN MATCHED AND t.id = 6 THEN "
+ " DELETE "
+ "WHEN NOT MATCHED AND s.id = 2 THEN "
+ " INSERT *",
tableName);

ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // updated
row(2, "emp-id-2") // new
);
assertEquals(
"Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
}.toMap

attrs.zipWithIndex.map { case (attr, index) =>
attr.withNullability(nullabilityMap(index))
AttributeReference(attr.name, attr.dataType, nullabilityMap(index), attr.metadata)()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Test;

public class TestCopyOnWriteMerge extends TestMerge {

Expand All @@ -38,4 +40,37 @@ public TestCopyOnWriteMerge(
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.MERGE_MODE, "copy-on-write");
}

@Test
public void testMergeWithTableWithNonNullableColumn() {
createAndInitTable(
"id INT NOT NULL, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-one\" }\n" + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");

createOrReplaceView(
"source",
"id INT NOT NULL, dep STRING",
"{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.id "
+ "WHEN MATCHED AND t.id = 1 THEN "
+ " UPDATE SET * "
+ "WHEN MATCHED AND t.id = 6 THEN "
+ " DELETE "
+ "WHEN NOT MATCHED AND s.id = 2 THEN "
+ " INSERT *",
tableName);

ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // updated
row(2, "emp-id-2") // new
);
assertEquals(
"Should have expected rows", expectedRows, sql("SELECT * FROM %s ORDER BY id", tableName));
}
}

0 comments on commit 3bc2f97

Please sign in to comment.