Skip to content

Commit

Permalink
Core: Fix exception handling in BaseTaskWriter (apache#5683)
Browse files Browse the repository at this point in the history
* Core: Fix exception handling in BaseTaskWriter.

* Fix state check.
  • Loading branch information
rdblue authored Sep 1, 2022
1 parent b2c4ec2 commit 9f946a1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 51 deletions.
13 changes: 0 additions & 13 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class S3OutputStream extends PositionOutputStream {

private long pos = 0;
private boolean closed = false;
private Throwable closeFailureException;

@SuppressWarnings("StaticAssignmentInConstructor")
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics)
Expand Down Expand Up @@ -258,15 +257,6 @@ private void newStream() throws IOException {

@Override
public void close() throws IOException {

// A failed s3 close removes state that is required for a successful close.
// Any future close on this stream should fail.
if (closeFailureException != null) {
throw new IOException(
"Attempted to close an S3 output stream that failed to close earlier",
closeFailureException);
}

if (closed) {
return;
}
Expand All @@ -277,9 +267,6 @@ public void close() throws IOException {
try {
stream.close();
completeUploads();
} catch (Exception e) {
closeFailureException = e;
throw e;
} finally {
cleanUpStagingFiles();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testWriteWithChecksumEnabled() {
}

@Test
public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
public void testDoubleClose() throws IOException {
IllegalStateException mockException =
new IllegalStateException("mock failure to completeUploads on close");
Mockito.doThrow(mockException)
Expand All @@ -195,9 +195,7 @@ public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
.isInstanceOf(mockException.getClass())
.hasMessageContaining(mockException.getMessage());

Assertions.assertThatThrownBy(stream::close)
.isInstanceOf(IOException.class)
.hasCause(mockException);
Assertions.assertThatNoException().isThrownBy(stream::close);
}

private void writeTest() {
Expand Down
97 changes: 64 additions & 33 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSize;
private Throwable failure;

protected BaseTaskWriter(
PartitionSpec spec,
Expand All @@ -72,6 +73,12 @@ protected PartitionSpec spec() {
return spec;
}

protected void setFailure(Throwable throwable) {
if (failure == null) {
this.failure = throwable;
}
}

@Override
public void abort() throws IOException {
close();
Expand All @@ -88,6 +95,8 @@ public void abort() throws IOException {
public WriteResult complete() throws IOException {
close();

Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure);

return WriteResult.builder()
.addDataFiles(completedDataFiles)
.addDeleteFiles(completedDeleteFiles)
Expand Down Expand Up @@ -180,28 +189,43 @@ public void deleteKey(T key) throws IOException {

@Override
public void close() throws IOException {
// Close data writer and add completed data files.
if (dataWriter != null) {
dataWriter.close();
dataWriter = null;
}
try {
// Close data writer and add completed data files.
if (dataWriter != null) {
try {
dataWriter.close();
} finally {
dataWriter = null;
}
}

// Close eq-delete writer and add completed equality-delete files.
if (eqDeleteWriter != null) {
eqDeleteWriter.close();
eqDeleteWriter = null;
}
// Close eq-delete writer and add completed equality-delete files.
if (eqDeleteWriter != null) {
try {
eqDeleteWriter.close();
} finally {
eqDeleteWriter = null;
}
}

if (insertedRowMap != null) {
insertedRowMap.clear();
insertedRowMap = null;
}
if (insertedRowMap != null) {
insertedRowMap.clear();
insertedRowMap = null;
}

// Add the completed pos-delete files.
if (posDeleteWriter != null) {
completedDeleteFiles.addAll(posDeleteWriter.complete());
referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
posDeleteWriter = null;
// Add the completed pos-delete files.
if (posDeleteWriter != null) {
try {
// complete will call close
completedDeleteFiles.addAll(posDeleteWriter.complete());
referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
} finally {
posDeleteWriter = null;
}
}
} catch (IOException | RuntimeException e) {
setFailure(e);
throw e;
}
}
}
Expand Down Expand Up @@ -286,22 +310,29 @@ private boolean shouldRollToNewFile() {

private void closeCurrent() throws IOException {
if (currentWriter != null) {
currentWriter.close();

if (currentRows == 0L) {
try {
io.deleteFile(currentFile.encryptingOutputFile());
} catch (UncheckedIOException e) {
// the file may not have been created, and it isn't worth failing the job to clean up,
// skip deleting
try {
currentWriter.close();

if (currentRows == 0L) {
try {
io.deleteFile(currentFile.encryptingOutputFile());
} catch (UncheckedIOException e) {
// the file may not have been created, and it isn't worth failing the job to clean up,
// skip deleting
}
} else {
complete(currentWriter);
}
} else {
complete(currentWriter);
}

this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
} catch (IOException | RuntimeException e) {
setFailure(e);
throw e;

} finally {
this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr

private int records = 0;
private boolean closed = false;
private Throwable failure;

SortedPosDeleteWriter(
FileAppenderFactory<T> appenderFactory,
Expand All @@ -74,6 +75,12 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD);
}

protected void setFailure(Throwable throwable) {
if (failure == null) {
this.failure = throwable;
}
}

@Override
public long length() {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -109,6 +116,8 @@ public void delete(CharSequence path, long pos, T row) {
public List<DeleteFile> complete() throws IOException {
close();

Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure);

return completedFiles;
}

Expand All @@ -119,8 +128,8 @@ public CharSequenceSet referencedDataFiles() {
@Override
public void close() throws IOException {
if (!closed) {
flushDeletes();
this.closed = true;
flushDeletes();
}
}

Expand Down Expand Up @@ -161,6 +170,7 @@ private void flushDeletes() {
positions.forEach(posRow -> closeableWriter.delete(path, posRow.pos(), posRow.row()));
}
} catch (IOException e) {
setFailure(e);
throw new UncheckedIOException(
"Failed to write the sorted path/pos pairs to pos-delete file: "
+ outputFile.encryptingOutputFile().location(),
Expand Down

0 comments on commit 9f946a1

Please sign in to comment.