Skip to content

Commit

Permalink
AWS: Support checksum validation with S3 eTags (apache#3813)
Browse files Browse the repository at this point in the history
* [S3FileIO] Add capability to perform checksum validations using S3 eTags.

* fix checkstyle error

* Update to move checksum checks to s3 server side

* Enable s3 checksum checks in aws integration tests

* Catch protocol error and log helpful error message

* Use digest bytes instead of MessageDigest and update tests

* Fix checkstyle failure

* Use DigestOutputStream

* Remove redundant spaces

* rename etag to checksum in leftover places

* address

* Remove ununsed import

* Config name change

* minor updates
  • Loading branch information
SinghAsDev authored Jan 25, 2022
1 parent 1dcb74b commit 7fd7ccb
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static void beforeClass() {
prefix = UUID.randomUUID().toString();
properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
properties.setS3ChecksumEnabled(true);
io = new S3FileIO(() -> s3, properties);
}

Expand Down
19 changes: 19 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public class AwsProperties implements Serializable {
*/
public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";

/**
* Enables eTag checks for S3 PUT and MULTIPART upload requests.
*/
public static final String S3_CHECKSUM_ENABLED = "s3.checksum-enabled";
public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;

private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
Expand All @@ -223,6 +229,8 @@ public class AwsProperties implements Serializable {

private String dynamoDbTableName;

private boolean isS3ChecksumEnabled;

public AwsProperties() {
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
Expand Down Expand Up @@ -284,6 +292,9 @@ public AwsProperties(Map<String, String> properties) {

this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
DYNAMODB_TABLE_NAME_DEFAULT);

this.isS3ChecksumEnabled = PropertyUtil.propertyAsBoolean(properties, S3_CHECKSUM_ENABLED,
CLIENT_ENABLE_ETAG_CHECK_DEFAULT);
}

public String s3FileIoSseType() {
Expand Down Expand Up @@ -373,4 +384,12 @@ public String dynamoDbTableName() {
public void setDynamoDbTableName(String name) {
this.dynamoDbTableName = name;
}

public boolean isS3ChecksumEnabled() {
return this.isS3ChecksumEnabled;
}

public void setS3ChecksumEnabled(boolean eTagCheckEnabled) {
this.isS3ChecksumEnabled = eTagCheckEnabled;
}
}
86 changes: 77 additions & 9 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.io.SequenceInputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -62,9 +65,11 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.utils.BinaryUtils;

class S3OutputStream extends PositionOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);
private static final String digestAlgorithm = "MD5";

private static volatile ExecutorService executorService;

Expand All @@ -74,13 +79,16 @@ class S3OutputStream extends PositionOutputStream {
private final AwsProperties awsProperties;

private CountingOutputStream stream;
private final List<File> stagingFiles = Lists.newArrayList();
private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
private final File stagingDirectory;
private File currentStagingFile;
private String multipartUploadId;
private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
private final int multiPartSize;
private final int multiPartThresholdSize;
private final boolean isChecksumEnabled;
private final MessageDigest completeMessageDigest;
private MessageDigest currentPartMessageDigest;

private long pos = 0;
private boolean closed = false;
Expand Down Expand Up @@ -110,6 +118,12 @@ class S3OutputStream extends PositionOutputStream {
multiPartSize = awsProperties.s3FileIoMultiPartSize();
multiPartThresholdSize = (int) (multiPartSize * awsProperties.s3FileIOMultipartThresholdFactor());
stagingDirectory = new File(awsProperties.s3fileIoStagingDirectory());
isChecksumEnabled = awsProperties.isS3ChecksumEnabled();
try {
completeMessageDigest = isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to create message digest needed for s3 checksum checks", e);
}

newStream();
}
Expand Down Expand Up @@ -178,9 +192,30 @@ private void newStream() throws IOException {
createStagingDirectoryIfNotExists();
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
stagingFiles.add(currentStagingFile);
try {
currentPartMessageDigest = isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to create message digest needed for s3 checksum checks.", e);
}

stagingFiles.add(new FileAndDigest(currentStagingFile, currentPartMessageDigest));

if (isChecksumEnabled) {
DigestOutputStream digestOutputStream;

stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
// if switched over to multipart threshold already, no need to update complete message digest
if (multipartUploadId != null) {
digestOutputStream = new DigestOutputStream(new BufferedOutputStream(
new FileOutputStream(currentStagingFile)), currentPartMessageDigest);
} else {
digestOutputStream = new DigestOutputStream(new DigestOutputStream(new BufferedOutputStream(
new FileOutputStream(currentStagingFile)), currentPartMessageDigest), completeMessageDigest);
}

stream = new CountingOutputStream(digestOutputStream);
} else {
stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
}
}

@Override
Expand Down Expand Up @@ -210,6 +245,7 @@ private void initializeMultiPartUpload() {
multipartUploadId = s3.createMultipartUpload(requestBuilder.build()).uploadId();
}

@SuppressWarnings("checkstyle:LocalVariableName")
private void uploadParts() {
// exit if multipart has not been initiated
if (multipartUploadId == null) {
Expand All @@ -218,17 +254,22 @@ private void uploadParts() {

stagingFiles.stream()
// do not upload the file currently being written
.filter(f -> closed || !f.equals(currentStagingFile))
.filter(f -> closed || !f.file().equals(currentStagingFile))
// do not upload any files that have already been processed
.filter(Predicates.not(multiPartMap::containsKey))
.forEach(f -> {
.filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
.forEach(fileAndDigest -> {
File f = fileAndDigest.file();
UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
.bucket(location.bucket())
.key(location.key())
.uploadId(multipartUploadId)
.partNumber(stagingFiles.indexOf(f) + 1)
.partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
.contentLength(f.length());

if (fileAndDigest.hasDigest()) {
requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.digest()));
}

S3RequestUtil.configureEncryption(awsProperties, requestBuilder);

UploadPartRequest uploadRequest = requestBuilder.build();
Expand Down Expand Up @@ -293,16 +334,17 @@ private void abortUpload() {
}

private void cleanUpStagingFiles() {
Tasks.foreach(stagingFiles)
Tasks.foreach(stagingFiles.stream().map(FileAndDigest::file))
.suppressFailureWhenFinished()
.onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, thrown))
.run(File::delete);
}

private void completeUploads() {
if (multipartUploadId == null) {
long contentLength = stagingFiles.stream().mapToLong(File::length).sum();
long contentLength = stagingFiles.stream().map(FileAndDigest::file).mapToLong(File::length).sum();
InputStream contentStream = new BufferedInputStream(stagingFiles.stream()
.map(FileAndDigest::file)
.map(S3OutputStream::uncheckedInputStream)
.reduce(SequenceInputStream::new)
.orElseGet(() -> new ByteArrayInputStream(new byte[0])));
Expand All @@ -311,6 +353,10 @@ private void completeUploads() {
.bucket(location.bucket())
.key(location.key());

if (isChecksumEnabled) {
requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
}

S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
S3RequestUtil.configurePermission(awsProperties, requestBuilder);

Expand Down Expand Up @@ -360,4 +406,26 @@ protected void finalize() throws Throwable {
LOG.warn("Unclosed output stream created by:\n\t{}", trace);
}
}

private static class FileAndDigest {
private final File file;
private final MessageDigest digest;

FileAndDigest(File file, MessageDigest digest) {
this.file = file;
this.digest = digest;
}

File file() {
return file;
}

byte[] digest() {
return digest.digest();
}

public boolean hasDigest() {
return digest != null;
}
}
}
Loading

0 comments on commit 7fd7ccb

Please sign in to comment.