Skip to content

Commit

Permalink
AWS: Fix PUT retry failures by opening new data file streams (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jfz authored Jul 22, 2022
1 parent 65b803f commit 742b3ec
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand Down Expand Up @@ -368,7 +370,7 @@ private void cleanUpStagingFiles() {
private void completeUploads() {
if (multipartUploadId == null) {
long contentLength = stagingFiles.stream().map(FileAndDigest::file).mapToLong(File::length).sum();
InputStream contentStream = new BufferedInputStream(stagingFiles.stream()
ContentStreamProvider contentProvider = () -> new BufferedInputStream(stagingFiles.stream()
.map(FileAndDigest::file)
.map(S3OutputStream::uncheckedInputStream)
.reduce(SequenceInputStream::new)
Expand All @@ -389,7 +391,9 @@ private void completeUploads() {
S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
S3RequestUtil.configurePermission(awsProperties, requestBuilder);

s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
s3.putObject(
requestBuilder.build(),
RequestBody.fromContentProvider(contentProvider, contentLength, Mimetype.MIMETYPE_OCTET_STREAM));
} else {
uploadParts();
completeMultiPartUpload();
Expand Down

0 comments on commit 742b3ec

Please sign in to comment.