Skip to content

Commit

Permalink
[SPARK-49125][SQL] Allow duplicated column names in CSV writing
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In file source writing, we disallow duplicated column names in the input query for all formats, because most formats don't do well with duplicate columns. However, this is not a good decision as long as there are formats allowing duplicated column names, and CSV is one of them.

This PR improves the `FileFormat` API to indicate if the format allows duplicated column names or not, and only perform the duplicated name check for formats that don't allow it.

### Why are the changes needed?

enable more use cases for the CSV data source

### Does this PR introduce _any_ user-facing change?

Yes, now users can write to CSV with duplicated column names.

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47633 from cloud-fan/csv.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Aug 8, 2024
1 parent 712b946 commit d0d98c1
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ trait FileFormat {
*/
def supportFieldName(name: String): Boolean = true

/**
* Returns whether this format allows duplicated column names in the input query during writing.
*/
def allowDuplicatedColumnNames: Boolean = false

/**
* All fields the file format's _metadata struct defines.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ case class InsertIntoHadoopFsRelationCommand(
staticPartitions.size)

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
SchemaUtils.checkColumnNameDuplication(
outputColumnNames,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
if (!fileFormat.allowDuplicatedColumnNames) {
SchemaUtils.checkColumnNameDuplication(
outputColumnNames,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
}

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
val fs = outputPath.getFileSystem(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,5 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
case _ => false
}

override def allowDuplicatedColumnNames: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait FileWrite extends Write {
def paths: Seq[String]
def formatName: String
def supportsDataType: DataType => Boolean
def allowDuplicatedColumnNames: Boolean = false
def info: LogicalWriteInfo

private val schema = info.schema()
Expand Down Expand Up @@ -91,9 +92,10 @@ trait FileWrite extends Write {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${paths.mkString(", ")}")
}
val pathName = paths.head
SchemaUtils.checkColumnNameDuplication(
schema.fields.map(_.name).toImmutableArraySeq, caseSensitiveAnalysis)
if (!allowDuplicatedColumnNames) {
SchemaUtils.checkColumnNameDuplication(
schema.fields.map(_.name).toImmutableArraySeq, caseSensitiveAnalysis)
}
DataSource.validateSchema(schema, sqlConf)

// TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ case class CSVWrite(
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends FileWrite {

override def allowDuplicatedColumnNames: Boolean = true

override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,22 @@ abstract class CSVSuite
}
}

test("SPARK-49125: write CSV files with duplicated field names") {
withTempPath { path =>
sql("SELECT 's1' a, 's2' a").write.csv(path.getCanonicalPath)
val df = spark.read.csv(path.getCanonicalPath)
assert(df.columns === Array("_c0", "_c1"))
checkAnswer(df, Row("s1", "s2"))
}

withTempPath { path =>
sql(s"INSERT OVERWRITE DIRECTORY '${path.getCanonicalPath}' USING csv SELECT 's1' a, 's2' a")
val df = spark.read.csv(path.getCanonicalPath)
assert(df.columns === Array("_c0", "_c1"))
checkAnswer(df, Row("s1", "s2"))
}
}

test("load null when the schema is larger than parsed tokens ") {
withTempPath { path =>
Seq("1").toDF().write.text(path.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
withTempDir { src =>
// Check CSV format
checkWriteDataColumnDuplication("csv", c0, c1, src)
checkReadUserSpecifiedDataColumnDuplication(
Seq((1, 1)).toDF("c0", "c1"), "csv", c0, c1, src)
// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
Expand Down

0 comments on commit d0d98c1

Please sign in to comment.