diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index 321050dceb74..c3f00fc352a3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -23,8 +23,11 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -299,4 +302,17 @@ public static long estimateSize(StructType tableSchema, long totalRecords) { } return result; } + + public static void validateMetadataColumnReferences(Schema tableSchema, Schema readSchema) { + List conflictingColumnNames = readSchema.columns().stream() + .map(Types.NestedField::name) + .filter(name -> MetadataColumns.isMetadataColumn(name) && tableSchema.findField(name) != null) + .collect(Collectors.toList()); + + ValidationException.check( + conflictingColumnNames.isEmpty(), + "Table column names conflict with names reserved for Iceberg metadata columns: %s.\n" + + "Please, use ALTER TABLE statements to rename the conflicting table columns.", + conflictingColumnNames); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index bcd66de6204f..bb7ac9448bc2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -80,6 +80,9 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { SparkBatchScan(SparkSession spark, Table table, SparkReadConf readConf, boolean caseSensitive, Schema expectedSchema, List filters, CaseInsensitiveStringMap options) { + + SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 848545c7ab24..2c591c1292db 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -24,6 +24,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -169,6 +170,35 @@ public void testPartitionMetadataColumnWithUnknownTransforms() { () -> sql("SELECT _partition FROM %s", TABLE_NAME)); } + @Test + public void testConflictingColumns() { + table.updateSchema() + .addColumn(MetadataColumns.SPEC_ID.name(), Types.IntegerType.get()) + .addColumn(MetadataColumns.FILE_PATH.name(), Types.StringType.get()) + .commit(); + + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1', -1, 'path/to/file')", TABLE_NAME); + + assertEquals("Rows must match", + ImmutableList.of(row(1L, "a1")), + sql("SELECT id, category FROM %s", TABLE_NAME)); + + AssertHelpers.assertThrows("Should fail to query conflicting columns", + ValidationException.class, "column names conflict", + () -> sql("SELECT * FROM %s", TABLE_NAME)); + + table.refresh(); + + table.updateSchema() + .renameColumn(MetadataColumns.SPEC_ID.name(), "_renamed" + MetadataColumns.SPEC_ID.name()) + .renameColumn(MetadataColumns.FILE_PATH.name(), "_renamed" + MetadataColumns.FILE_PATH.name()) + .commit(); + + assertEquals("Rows must match", + ImmutableList.of(row(0, null, -1)), + sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME)); + } + private void createAndInitTable() throws IOException { this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());