Skip to content

Commit

Permalink
[SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES …
Browse files Browse the repository at this point in the history
…enabled

## What changes were proposed in this pull request?

With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source.

## How was this patch tested?

Unit test

Closes apache#22611 from gengliangwang/ignoreCorruptAvro.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
  • Loading branch information
gengliangwang authored and HyukjinKwon committed Oct 3, 2018
1 parent d6be46e commit 928d073
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.TaskContext
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

private[avro] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {
Expand All @@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat
val conf = spark.sessionState.newHadoopConf()
val parsedOptions = new AvroOptions(options, conf)

// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (parsedOptions.ignoreExtension) {
files.headOption.getOrElse {
throw new FileNotFoundException("Files for schema inferring have been not found.")
}
} else {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}

// User can specify an optional avro json schema.
val avroSchema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse {
val in = new FsInput(sampleFile.getPath, conf)
try {
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
try {
reader.getSchema
} finally {
reader.close()
}
} finally {
in.close()
}
}
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
spark.sessionState.conf.ignoreCorruptFiles)
}

SchemaConverters.toSqlType(avroSchema).dataType match {
case t: StructType => Some(t)
Expand All @@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat
}
}

private def inferAvroSchemaFromFiles(
files: Seq[FileStatus],
conf: Configuration,
ignoreExtension: Boolean,
ignoreCorruptFiles: Boolean): Schema = {
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
// figure out the schema of the whole dataset.
val avroReader = files.iterator.map { f =>
val path = f.getPath
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
None
} else {
Utils.tryWithResource {
new FsInput(path, conf)
} { in =>
try {
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
None
} else {
throw new SparkException(s"Could not read file: $path", e)
}
}
}
}
}.collectFirst {
case Some(reader) => reader
}

avroReader match {
case Some(reader) =>
try {
reader.getSchema
} finally {
reader.close()
}
case None =>
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}

override def shortName(): String = "avro"

override def isSplitable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
Expand Down Expand Up @@ -342,6 +343,48 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

private def createDummyCorruptFile(dir: File): Unit = {
Utils.tryWithResource {
FileUtils.forceMkdir(dir)
val corruptFile = new File(dir, "corrupt.avro")
new BufferedWriter(new FileWriter(corruptFile))
} { writer =>
writer.write("corrupt")
}
}

test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
withTempPath { dir =>
createDummyCorruptFile(dir)
val message = intercept[FileNotFoundException] {
spark.read.format("avro").load(dir.getAbsolutePath).schema
}.getMessage
assert(message.contains("No Avro files found."))

val srcFile = new File("src/test/resources/episodes.avro")
val destFile = new File(dir, "episodes.avro")
FileUtils.copyFile(srcFile, destFile)

val result = spark.read.format("avro").load(srcFile.getAbsolutePath).collect()
checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result)
}
}
}

test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") {
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
withTempPath { dir =>
createDummyCorruptFile(dir)
val message = intercept[org.apache.spark.SparkException] {
spark.read.format("avro").load(dir.getAbsolutePath)
}.getMessage

assert(message.contains("Could not read file"))
}
}
}

test("Date field type") {
withTempPath { dir =>
val schema = StructType(Seq(
Expand Down

0 comments on commit 928d073

Please sign in to comment.