Skip to content

Commit

Permalink
Core: Avoid useless metadata retries (apache#5696)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Sep 5, 2022
1 parent 3804d74 commit b736fbc
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.util.Arrays;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
Expand All @@ -37,6 +38,7 @@
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

class S3InputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
Expand Down Expand Up @@ -184,7 +186,12 @@ private void openStream() throws IOException {
S3RequestUtil.configureEncryption(awsProperties, requestBuilder);

closeStream();
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());

try {
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
} catch (NoSuchKeyException e) {
throw new NotFoundException(e, "Location does not exist: %s", location);
}
}

private void closeStream() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import java.util.Map;
import org.apache.iceberg.aws.AwsClientFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

class StaticClientFactory implements AwsClientFactory {
static S3Client client;

@Override
public S3Client s3() {
return client;
}

@Override
public GlueClient glue() {
return null;
}

@Override
public KmsClient kms() {
return null;
}

@Override
public DynamoDbClient dynamo() {
return null;
}

@Override
public void initialize(Map<String, String> properties) {}
}
58 changes: 58 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,30 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableSupplier;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -95,6 +105,7 @@ public void before() {
.createBucket(
CreateBucketRequest.builder().bucket(batchDeletionBucketPrefix + i).build());
}
StaticClientFactory.client = s3mock;
}

@Test
Expand Down Expand Up @@ -250,6 +261,53 @@ public void testPrefixDelete() {
});
}

@Test
public void testReadMissingLocation() {
String location = "s3://bucket/path/to/data.parquet";
InputFile in = s3FileIO.newInputFile(location);
AssertHelpers.assertThrows(
"Should fail with NotFoundException",
NotFoundException.class,
"Location does not exist",
() -> in.newStream().read());
}

@Test
public void testMissingTableMetadata() {
Map<String, String> conf = Maps.newHashMap();
conf.put(
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));
conf.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
conf.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
conf.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://bucket/warehouse");
conf.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
conf.put(AwsProperties.CLIENT_FACTORY, StaticClientFactory.class.getName());

try (JdbcCatalog catalog = new JdbcCatalog()) {
catalog.initialize("test_jdbc_catalog", conf);

Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
TableIdentifier ident = TableIdentifier.of("table_name");
BaseTable table = (BaseTable) catalog.createTable(ident, schema);

// delete the current metadata
s3FileIO.deleteFile(table.operations().current().metadataFileLocation());

long start = System.currentTimeMillis();
// to test NotFoundException, load the table again. refreshing the existing table doesn't
// require reading metadata
AssertHelpers.assertThrows(
"Should fail to refresh",
NotFoundException.class,
"Location does not exist",
() -> catalog.loadTable(ident));
long duration = System.currentTimeMillis() - start;

Assert.assertTrue("Should take less than 10 seconds", duration < 10_000);
}
}

@Test
public void testFileIOJsonSerialization() {
Object conf;
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ project(':iceberg-aws') {
exclude group: 'junit'
}
testImplementation "com.esotericsoftware:kryo"
testImplementation "org.xerial:sqlite-jdbc"
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -195,6 +196,7 @@ protected void refreshFromMetadataLocation(
.retry(numRetries)
.exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.throwFailureWhenFinished()
.stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));

Expand Down

0 comments on commit b736fbc

Please sign in to comment.