Skip to content

Commit

Permalink
AWS: Move S3 writeTags to AwsProperties (apache#4350)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajarshisarkar authored Mar 17, 2022
1 parent 5b9cd7a commit fc92d07
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 90 deletions.
22 changes: 20 additions & 2 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Tag;

public class AwsProperties implements Serializable {

Expand Down Expand Up @@ -259,14 +263,14 @@ public class AwsProperties implements Serializable {
private double s3FileIoMultipartThresholdFactor;
private String s3fileIoStagingDirectory;
private ObjectCannedACL s3FileIoAcl;
private boolean isS3ChecksumEnabled;
private final Set<Tag> s3WriteTags;

private String glueCatalogId;
private boolean glueCatalogSkipArchive;

private String dynamoDbTableName;

private boolean isS3ChecksumEnabled;

public AwsProperties() {
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
Expand All @@ -279,6 +283,7 @@ public AwsProperties() {
this.s3FileIoDeleteBatchSize = S3FILEIO_DELETE_BATCH_SIZE_DEFAULT;
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT;
this.s3WriteTags = Sets.newHashSet();

this.glueCatalogId = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
Expand Down Expand Up @@ -337,6 +342,8 @@ public AwsProperties(Map<String, String> properties) {
s3FileIoDeleteBatchSize <= S3FILEIO_DELETE_BATCH_SIZE_MAX,
String.format("Deletion batch size must be between 1 and %s", S3FILEIO_DELETE_BATCH_SIZE_MAX));

this.s3WriteTags = toTags(properties, S3_WRITE_TAGS_PREFIX);

this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
DYNAMODB_TABLE_NAME_DEFAULT);
}
Expand Down Expand Up @@ -444,4 +451,15 @@ public boolean isS3ChecksumEnabled() {
public void setS3ChecksumEnabled(boolean eTagCheckEnabled) {
this.isS3ChecksumEnabled = eTagCheckEnabled;
}

public Set<Tag> getS3WriteTags() {
return s3WriteTags;
}

private Set<Tag> toTags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix)
.entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
.collect(Collectors.toSet());
}
}
18 changes: 0 additions & 18 deletions aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,26 @@

package org.apache.iceberg.aws.s3;

import java.util.Set;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Tag;

abstract class BaseS3File {
private final S3Client client;
private final S3URI uri;
private final AwsProperties awsProperties;
private HeadObjectResponse metadata;
private final MetricsContext metrics;
private final Set<Tag> writeTags;

BaseS3File(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) {
this.client = client;
this.uri = uri;
this.awsProperties = awsProperties;
this.metrics = metrics;
this.writeTags = Sets.newHashSet();
}

BaseS3File(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics,
Set<Tag> writeTags) {
this.client = client;
this.uri = uri;
this.awsProperties = awsProperties;
this.metrics = metrics;
this.writeTags = writeTags;
}

public String location() {
Expand All @@ -75,10 +61,6 @@ protected MetricsContext metrics() {
return metrics;
}

public Set<Tag> writeTags() {
return writeTags;
}

/**
* Note: this may be stale if file was deleted since metadata is cached for size/existence checks.
*
Expand Down
13 changes: 1 addition & 12 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +48,6 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.Tag;

/**
* FileIO implementation backed by S3.
Expand All @@ -67,7 +65,6 @@ public class S3FileIO implements FileIO, SupportsBulkOperations {
private transient S3Client client;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private Set<Tag> writeTags = Sets.newHashSet();

/**
* No-arg constructor to load the FileIO dynamically.
Expand Down Expand Up @@ -106,7 +103,7 @@ public InputFile newInputFile(String path) {

@Override
public OutputFile newOutputFile(String path) {
return S3OutputFile.fromLocation(path, client(), awsProperties, metrics, writeTags);
return S3OutputFile.fromLocation(path, client(), awsProperties, metrics);
}

@Override
Expand Down Expand Up @@ -190,7 +187,6 @@ private S3Client client() {
@Override
public void initialize(Map<String, String> properties) {
this.awsProperties = new AwsProperties(properties);
this.writeTags = toTags(properties);

// Do not override s3 client if it was provided
if (s3 == null) {
Expand Down Expand Up @@ -218,11 +214,4 @@ public void close() {
}
}
}

private Set<Tag> toTags(Map<String, String> properties) {
return PropertyUtil.propertiesWithPrefix(properties, AwsProperties.S3_WRITE_TAGS_PREFIX)
.entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
.collect(Collectors.toSet());
}
}
16 changes: 4 additions & 12 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,22 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Set;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Tag;

public class S3OutputFile extends BaseS3File implements OutputFile {
public static S3OutputFile fromLocation(String location, S3Client client, AwsProperties awsProperties,
MetricsContext metrics) {
return new S3OutputFile(client, new S3URI(location), awsProperties, metrics, Sets.newHashSet());
return new S3OutputFile(client, new S3URI(location), awsProperties, metrics);
}

public static S3OutputFile fromLocation(String location, S3Client client, AwsProperties awsProperties,
MetricsContext metrics, Set<Tag> writeTags) {
return new S3OutputFile(client, new S3URI(location), awsProperties, metrics, writeTags);
}

S3OutputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics, Set<Tag> writeTags) {
super(client, uri, awsProperties, metrics, writeTags);
S3OutputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) {
super(client, uri, awsProperties, metrics);
}

/**
Expand All @@ -65,7 +57,7 @@ public PositionOutputStream create() {
@Override
public PositionOutputStream createOrOverwrite() {
try {
return new S3OutputStream(client(), uri(), awsProperties(), metrics(), writeTags());
return new S3OutputStream(client(), uri(), awsProperties(), metrics());
} catch (IOException e) {
throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class S3OutputStream extends PositionOutputStream {
private boolean closed = false;

@SuppressWarnings("StaticAssignmentInConstructor")
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics, Set<Tag> writeTags)
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics)
throws IOException {
if (executorService == null) {
synchronized (S3OutputStream.class) {
Expand All @@ -124,7 +124,7 @@ class S3OutputStream extends PositionOutputStream {
this.s3 = s3;
this.location = location;
this.awsProperties = awsProperties;
this.writeTags = writeTags;
this.writeTags = awsProperties.getS3WriteTags();

this.createStack = Thread.currentThread().getStackTrace();

Expand Down Expand Up @@ -260,7 +260,7 @@ private void initializeMultiPartUpload() {
CreateMultipartUploadRequest.Builder requestBuilder = CreateMultipartUploadRequest.builder()
.bucket(location.bucket())
.key(location.key());
if (!writeTags.isEmpty()) {
if (writeTags != null && !writeTags.isEmpty()) {
requestBuilder.tagging(Tagging.builder().tagSet(writeTags).build());
}

Expand Down Expand Up @@ -378,7 +378,7 @@ private void completeUploads() {
.bucket(location.bucket())
.key(location.key());

if (!writeTags.isEmpty()) {
if (writeTags != null && !writeTags.isEmpty()) {
requestBuilder.tagging(Tagging.builder().tagSet(writeTags).build());
}

Expand Down
28 changes: 0 additions & 28 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SerializableSupplier;
import org.junit.Assert;
Expand All @@ -50,7 +49,6 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.Tag;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -192,30 +190,4 @@ public void testSerializeClient() {

assertEquals("s3", post.get().serviceName());
}

@Test
public void testWriteTags() throws IOException {
String location = "s3://bucket/path/to/file.txt";
byte[] expected = new byte[1024 * 1024];
random.nextBytes(expected);

InputFile in = s3FileIO.newInputFile(location);
assertFalse(in.exists());

OutputFile out = s3FileIO.newOutputFile(location);
try (OutputStream os = out.createOrOverwrite()) {
IOUtils.write(expected, os);
}

assertTrue(in.exists());

// Assert for writeTags
assertTrue(((S3InputFile) in).writeTags().isEmpty());
assertEquals(((S3OutputFile) out).writeTags().size(), 1);
assertEquals(((S3OutputFile) out).writeTags(), ImmutableSet.of(
Tag.builder().key("tagKey1").value("TagValue1").build()));

s3FileIO.deleteFile(in);
assertFalse(s3FileIO.newInputFile(location).exists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.stream.Stream;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -88,13 +87,12 @@ public class TestS3OutputStream {
private final Random random = new Random(1);
private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
private final String newTmpDirectory = "/tmp/newStagingDirectory";
private final Set<Tag> tags = ImmutableSet.of(
Tag.builder().key("abc").value("123").build(),
Tag.builder().key("def").value("789").build());

private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
AwsProperties.S3FILEIO_STAGING_DIRECTORY, tmpDir.toString()));
AwsProperties.S3FILEIO_STAGING_DIRECTORY, tmpDir.toString(),
"s3.write.tags.abc", "123",
"s3.write.tags.def", "789"));

public TestS3OutputStream() throws IOException {
}
Expand Down Expand Up @@ -122,8 +120,7 @@ public void testWrite() {
public void testAbortAfterFailedPartUpload() {
doThrow(new RuntimeException()).when(s3mock).uploadPart((UploadPartRequest) any(), (RequestBody) any());

try (S3OutputStream stream = new S3OutputStream(
s3mock, randomURI(), properties, nullMetrics(), tags)) {
try (S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) {
stream.write(randomData(10 * 1024 * 1024));
} catch (Exception e) {
verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any());
Expand All @@ -134,8 +131,7 @@ s3mock, randomURI(), properties, nullMetrics(), tags)) {
public void testAbortMultipart() {
doThrow(new RuntimeException()).when(s3mock).completeMultipartUpload((CompleteMultipartUploadRequest) any());

try (S3OutputStream stream = new S3OutputStream(
s3mock, randomURI(), properties, nullMetrics(), tags)) {
try (S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) {
stream.write(randomData(10 * 1024 * 1024));
} catch (Exception e) {
verify(s3mock).abortMultipartUpload((AbortMultipartUploadRequest) any());
Expand All @@ -144,7 +140,7 @@ s3mock, randomURI(), properties, nullMetrics(), tags)) {

@Test
public void testMultipleClose() throws IOException {
S3OutputStream stream = new S3OutputStream(s3, randomURI(), properties, nullMetrics(), tags);
S3OutputStream stream = new S3OutputStream(s3, randomURI(), properties, nullMetrics());
stream.close();
stream.close();
}
Expand All @@ -153,8 +149,7 @@ public void testMultipleClose() throws IOException {
public void testStagingDirectoryCreation() throws IOException {
AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory));
S3OutputStream stream = new S3OutputStream(
s3, randomURI(), newStagingDirectoryAwsProperties, nullMetrics(), tags);
S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties, nullMetrics());
stream.close();
}

Expand Down Expand Up @@ -239,7 +234,7 @@ private void checkTags(ArgumentCaptor<PutObjectRequest> putObjectRequestArgument
if (properties.isS3ChecksumEnabled()) {
List<PutObjectRequest> putObjectRequests = putObjectRequestArgumentCaptor.getAllValues();
String tagging = putObjectRequests.get(0).tagging();
assertEquals(getTags(tags), tagging);
assertEquals(getTags(properties.getS3WriteTags()), tagging);
}
}

Expand All @@ -261,7 +256,7 @@ private String getDigest(byte[] data, int offset, int length) {
}

private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
try (S3OutputStream stream = new S3OutputStream(client, uri, properties, nullMetrics(), tags)) {
try (S3OutputStream stream = new S3OutputStream(client, uri, properties, nullMetrics())) {
if (arrayWrite) {
stream.write(data);
assertEquals(data.length, stream.getPos());
Expand Down

0 comments on commit fc92d07

Please sign in to comment.