Skip to content

Commit

Permalink
Spark: Validate table columns don't conflict with metadata columns (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 5, 2021
1 parent d85a2b2 commit 8ef5ed1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -299,4 +302,17 @@ public static long estimateSize(StructType tableSchema, long totalRecords) {
}
return result;
}

public static void validateMetadataColumnReferences(Schema tableSchema, Schema readSchema) {
List<String> conflictingColumnNames = readSchema.columns().stream()
.map(Types.NestedField::name)
.filter(name -> MetadataColumns.isMetadataColumn(name) && tableSchema.findField(name) != null)
.collect(Collectors.toList());

ValidationException.check(
conflictingColumnNames.isEmpty(),
"Table column names conflict with names reserved for Iceberg metadata columns: %s.\n" +
"Please, use ALTER TABLE statements to rename the conflicting table columns.",
conflictingColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {

SparkBatchScan(SparkSession spark, Table table, SparkReadConf readConf, boolean caseSensitive,
Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -169,6 +170,35 @@ public void testPartitionMetadataColumnWithUnknownTransforms() {
() -> sql("SELECT _partition FROM %s", TABLE_NAME));
}

@Test
public void testConflictingColumns() {
table.updateSchema()
.addColumn(MetadataColumns.SPEC_ID.name(), Types.IntegerType.get())
.addColumn(MetadataColumns.FILE_PATH.name(), Types.StringType.get())
.commit();

sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1', -1, 'path/to/file')", TABLE_NAME);

assertEquals("Rows must match",
ImmutableList.of(row(1L, "a1")),
sql("SELECT id, category FROM %s", TABLE_NAME));

AssertHelpers.assertThrows("Should fail to query conflicting columns",
ValidationException.class, "column names conflict",
() -> sql("SELECT * FROM %s", TABLE_NAME));

table.refresh();

table.updateSchema()
.renameColumn(MetadataColumns.SPEC_ID.name(), "_renamed" + MetadataColumns.SPEC_ID.name())
.renameColumn(MetadataColumns.FILE_PATH.name(), "_renamed" + MetadataColumns.FILE_PATH.name())
.commit();

assertEquals("Rows must match",
ImmutableList.of(row(0, null, -1)),
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME));
}

private void createAndInitTable() throws IOException {
this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());

Expand Down

0 comments on commit 8ef5ed1

Please sign in to comment.