From c7e96a6183ceda27bcabcbd4db2931f72b7474ed Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Thu, 12 Oct 2023 00:06:22 +0200 Subject: [PATCH] Fail query when the symlink file contains non-existent paths --- .../hive/BackgroundHiveSplitLoader.java | 15 ++++++---- .../hive/TestParquetSymlinkInputFormat.java | 28 +++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index f15040a8eb55..8b79ea2c23a6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -525,12 +525,15 @@ Iterator buildManifestFileIterator(InternalHiveSplitFactory s checkPartitionLocationExists(trinoFileSystem, location); } fileStatusIterator.forEachRemaining(status -> fileStatuses.put(Location.of(status.getPath()).path(), status)); - - List locatedFileStatuses = paths.stream() - .map(path -> fileStatuses.get(path.path())) - .toList(); - - return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), locatedFileStatuses.stream()); + Stream fileStream = paths.stream() + .map(path -> { + TrinoFileStatus status = fileStatuses.get(path.path()); + if (status == null) { + throw new TrinoException(HIVE_FILE_NOT_FOUND, "Manifest file from the location [%s] contains non-existent path: %s".formatted(location, path)); + } + return status; + }); + return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), fileStream); } private ListenableFuture getTransactionalSplits(Location path, boolean splittable, Optional bucketConversion, InternalHiveSplitFactory splitFactory) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestParquetSymlinkInputFormat.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestParquetSymlinkInputFormat.java index 3ce925d1e6b9..abd7bbf7f096 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestParquetSymlinkInputFormat.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestParquetSymlinkInputFormat.java @@ -30,6 +30,7 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestParquetSymlinkInputFormat { @@ -66,6 +67,33 @@ public void testSymlinkTable() hdfsClient.delete(dataDir); } + @Test(groups = STORAGE_FORMATS) + public void testSymlinkTableWithSymlinkFileContainingNonExistentPath() + throws Exception + { + String table = "test_parquet_invalid_symlink"; + onHive().executeQuery("DROP TABLE IF EXISTS " + table); + + onHive().executeQuery("" + + "CREATE TABLE " + table + + "(col int) " + + "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " + + "STORED AS " + + "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' " + + "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"); + + String tableRoot = warehouseDirectory + '/' + table; + String dataDir = warehouseDirectory + "/data_test_parquet_invalid_symlink"; + + saveResourceOnHdfs("data.parquet", dataDir + "/data.parquet"); + hdfsClient.saveFile(tableRoot + "/symlink.txt", format("hdfs:%s/data.parquet\nhdfs:%s/missingfile.parquet", dataDir, dataDir)); + assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM " + table)) + .hasMessageMatching(".*Manifest file from the location \\[.*data_test_parquet_invalid_symlink\\] contains non-existent path:.*missingfile.parquet"); + + onHive().executeQuery("DROP TABLE " + table); + hdfsClient.delete(dataDir); + } + @Test(groups = {AVRO, STORAGE_FORMATS}) public void testSymlinkTableWithMultipleParentDirectories() throws Exception