Skip to content

Commit

Permalink
Core: Support FileIO prefix operations (apache#5096)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweeks authored Jun 22, 2022
1 parent a5efb53 commit ac8733d
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 2 deletions.
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

public class FileInfo {
private final String location;
private final long size;
private final long createdAtMillis;

public FileInfo(String location, long size, long createdAtMillis) {
this.location = location;
this.size = size;
this.createdAtMillis = createdAtMillis;
}

public String location() {
return location;
}

public long size() {
return size;
}

public long createdAtMillis() {
return createdAtMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

/**
* This interface is intended as an extension for FileIO implementations
* to provide additional prefix based operations that may be useful in
* performing supporting operations.
*/
public interface SupportsPrefixOperations {

/**
* Return an iterable of all files under a prefix.
* <p>
* Hierarchical file systems (e.g. HDFS) may impose additional restrictions
* like the prefix must fully match a directory whereas key/value object
* stores may allow for arbitrary prefixes.
*
* @param prefix prefix to list
* @return iterable of file information
*/
Iterable<FileInfo> listPrefix(String prefix);

/**
* Delete all files under a prefix.
* <p>
* Hierarchical file systems (e.g. HDFS) may impose additional restrictions
* like the prefix must fully match a directory whereas key/value object
* stores may allow for arbitrary prefixes.
*
* @param prefix prefix to delete
*/
void deletePrefix(String prefix);
}
33 changes: 32 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CredentialSupplier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
Expand All @@ -54,6 +57,7 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
Expand All @@ -67,7 +71,7 @@
* URIs with schemes s3a, s3n, https are also treated as s3 file paths.
* Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements FileIO, SupportsBulkOperations, CredentialSupplier {
public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, CredentialSupplier {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext";
private static volatile ExecutorService executorService;
Expand Down Expand Up @@ -241,6 +245,33 @@ private List<String> deleteObjectsInBucket(String bucket, Collection<String> obj
return Lists.newArrayList();
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
S3URI s3uri = new S3URI(prefix, awsProperties.s3BucketToAccessPointMapping());
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(s3uri.bucket()).prefix(s3uri.key()).build();

return () -> client().listObjectsV2Paginator(request).stream()
.flatMap(r -> r.contents().stream())
.map(o -> new FileInfo(
String.format("%s://%s/%s", s3uri.scheme(), s3uri.bucket(), o.key()),
o.size(), o.lastModified().toEpochMilli())).iterator();
}

/**
* This method provides a "best-effort" to delete all objects under the
* given prefix.
*
* Bulk delete operations are used and no reattempt is made for deletes if
* they fail, but will log any individual objects that are not deleted as part
* of the bulk operation.
*
* @param prefix prefix to delete
*/
@Override
public void deletePrefix(String prefix) {
deleteFiles(() -> Streams.stream(listPrefix(prefix)).map(FileInfo::location).iterator());
}

private S3Client client() {
if (client == null) {
synchronized (this) {
Expand Down
11 changes: 11 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class S3URI {
private static final String FRAGMENT_DELIM = "#";

private final String location;
private final String scheme;
private final String bucket;
private final String key;

Expand Down Expand Up @@ -73,6 +74,7 @@ class S3URI {
this.location = location;
String [] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(schemeSplit.length == 2, "Invalid S3 URI, cannot determine scheme: %s", location);
this.scheme = schemeSplit[0];

String [] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
ValidationException.check(authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: %s", location);
Expand Down Expand Up @@ -108,6 +110,15 @@ public String location() {
return location;
}

/**
* Returns the original scheme provided in the location.
*
* @return uri scheme
*/
public String scheme() {
return scheme;
}

@Override
public String toString() {
return location;
Expand Down
42 changes: 42 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -190,4 +193,43 @@ public void testSerializeClient() {

assertEquals("s3", post.get().serviceName());
}

@Test
public void testPrefixList() {
String prefix = "s3://bucket/path/to/list";

List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);

scaleSizes.parallelStream().forEach(scale -> {
String scalePrefix = String.format("%s/%s/", prefix, scale);

createRandomObjects(scalePrefix, scale);
assertEquals((long) scale, Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
});

long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
Assertions.assertEquals(totalFiles, Streams.stream(s3FileIO.listPrefix(prefix)).count());
}

@Test
public void testPrefixDelete() {
String prefix = "s3://bucket/path/to/delete";
List<Integer> scaleSizes = Lists.newArrayList(0, 5, 1000, 2500);

scaleSizes.parallelStream().forEach(scale -> {
String scalePrefix = String.format("%s/%s/", prefix, scale);

createRandomObjects(scalePrefix, scale);
s3FileIO.deletePrefix(scalePrefix);
assertEquals(0L, Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
});
}

private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);

random.ints(count).parallel().forEach(i ->
s3mock.putObject(builder -> builder.bucket(s3URI.bucket()).key(s3URI.key() + i).build(), RequestBody.empty())
);
}
}
68 changes: 67 additions & 1 deletion core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@
package org.apache.iceberg.hadoop;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;

public class HadoopFileIO implements FileIO, HadoopConfigurable {
public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations {

private SerializableSupplier<Configuration> hadoopConf;

Expand Down Expand Up @@ -89,4 +95,64 @@ public Configuration getConf() {
public void serializeConfWith(Function<Configuration, SerializableSupplier<Configuration>> confSerializer) {
this.hadoopConf = confSerializer.apply(getConf());
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
Path prefixToList = new Path(prefix);
FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());

return () -> {
try {
return Streams.stream(new AdaptingIterator<>(fs.listFiles(prefixToList, true /* recursive */)))
.map(fileStatus -> new FileInfo(fileStatus.getPath().toString(), fileStatus.getLen(),
fileStatus.getModificationTime())).iterator();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}

@Override
public void deletePrefix(String prefix) {
Path prefixToDelete = new Path(prefix);
FileSystem fs = Util.getFs(prefixToDelete, hadoopConf.get());

try {
fs.delete(prefixToDelete, true /* recursive */);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* This class is a simple adaptor to allow for using Hadoop's
* RemoteIterator as an Iterator.
*
* @param <E> element type
*/
private static class AdaptingIterator<E> implements Iterator<E>, RemoteIterator<E> {
private final RemoteIterator<E> delegate;

AdaptingIterator(RemoteIterator<E> delegate) {
this.delegate = delegate;
}

@Override
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public E next() {
try {
return delegate.next();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}
Loading

0 comments on commit ac8733d

Please sign in to comment.