Skip to content

Commit

Permalink
[SPARK-11328][SQL] Improve error message when hitting this issue
Browse files Browse the repository at this point in the history
The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.

This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.

Author: Nong Li <[email protected]>
Author: Nong Li <[email protected]>

Closes apache#10080 from nongli/spark-11328.
  • Loading branch information
nongli authored and yhuai committed Dec 1, 2015
1 parent ef6790f commit 47a0abc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer(
}
}

protected def newOutputWriter(path: String): OutputWriter = {
try {
outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
// Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
"the first error.\n File exists error: " + e)
}
throw e
}
}

private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)

Expand Down Expand Up @@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer(
executorSideSetup(taskContext)
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
configuration.set("spark.sql.sources.output.path", outputPath)
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
val writer = newOutputWriter(getWorkPath)
writer.initConverter(dataSchema)

var writerClosed = false
Expand Down Expand Up @@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer(
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
configuration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
val newWriter = super.newOutputWriter(path.toString)
newWriter.initConverter(dataSchema)
newWriter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
private[datasources] class DirectParquetOutputCommitter(
outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])

Expand Down

0 comments on commit 47a0abc

Please sign in to comment.