Skip to content

Commit

Permalink
[SPARK-42668][SS] Catch exception while trying to close compressed st…
Browse files Browse the repository at this point in the history
…ream in HDFSStateStoreProvider abort

### What changes were proposed in this pull request?
We have seen some cases where the task exits as cancelled/failed which triggers the abort in the task completion listener for HDFSStateStoreProvider. As part of this, we cancel the backing stream and close the compressed stream. However, different stores such as Azure blob store could throw exceptions which are not caught in the current path, leading to job failures. This change proposes to fix this issue by catching all non fatal exceptions thrown by cancel/close.

### Why are the changes needed?
Changes are required to avoid job failures due to exceptions thrown by output stream handlers on abort with the HDFSStateStoreProvider.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Modified a test and simulated a NPE in the abort path and verified that the task and thereby the job fails before this change.
After the change, the test passes fine.

Closes apache#40273 from anishshri-db/task/SPARK-42668.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Mar 6, 2023
1 parent db17155 commit 252e7a3
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
if (rawStream != null) rawStream.cancel()
IOUtils.closeQuietly(compressedStream)
} catch {
// Closing the compressedStream causes the stream to write/flush flush data into the
// rawStream. Since the rawStream is already closed, there may be errors.
// Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
// IOException into FSError.
case e: FSError if e.getCause.isInstanceOf[IOException] =>
// Closing the compressedStream causes the stream to write/flush flush data into the
// rawStream. Since the rawStream is already closed, there may be errors.
// Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
// IOException into FSError.

// SPARK-42668 - Catch and log any other exception thrown while trying to cancel
// raw stream or close compressed stream.
case NonFatal(ex) =>
logInfo(s"Failed to cancel delta file for provider=$stateStoreId " +
s"with exception=$ex")
}
}

Expand Down

0 comments on commit 252e7a3

Please sign in to comment.