From 37142709edd2b1b5da5a0db2036eacd9f608c8b5 Mon Sep 17 00:00:00 2001 From: pvary Date: Mon, 16 Jan 2023 20:06:12 +0100 Subject: [PATCH] Flink: Backport: Improve unit tests for sink (#6603) --- .../apache/iceberg/flink/SimpleDataUtil.java | 10 ++++++---- .../iceberg/flink/sink/TestFlinkManifest.java | 9 ++++----- .../flink/sink/TestIcebergFilesCommitter.java | 19 ++++++++++--------- .../flink/sink/TestIcebergStreamWriter.java | 10 ++++------ .../iceberg/flink/sink/TestTaskWriters.java | 10 ++++------ 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 970feea2ae55..e29676350855 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -156,13 +156,14 @@ public static DataFile writeFile( public static DeleteFile writeEqDeleteFile( Table table, FileFormat format, - String tablePath, String filename, FileAppenderFactory appenderFactory, List deletes) throws IOException { EncryptedOutputFile outputFile = - table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); + table + .encryption() + .encrypt(fromPath(new Path(table.location(), filename), new Configuration())); EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null); @@ -175,13 +176,14 @@ public static DeleteFile writeEqDeleteFile( public static DeleteFile writePosDeleteFile( Table table, FileFormat format, - String tablePath, String filename, FileAppenderFactory appenderFactory, List> positions) throws IOException { EncryptedOutputFile outputFile = - table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); + table + .encryption() + .encrypt(fromPath(new Path(table.location(), filename), new Configuration())); PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 5528e71b3db1..36801420106e 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -57,7 +57,6 @@ public class TestFlinkManifest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - private String tablePath; private Table table; private FileAppenderFactory appenderFactory; private final AtomicInteger fileCount = new AtomicInteger(0); @@ -67,7 +66,7 @@ public void before() throws IOException { File folder = tempFolder.newFolder(); String warehouse = folder.getAbsolutePath(); - tablePath = warehouse.concat("/test"); + String tablePath = warehouse.concat("/test"); Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir()); // Construct the iceberg table. @@ -260,20 +259,20 @@ private DataFile writeDataFile(String filename, List rows) throws IOExc table.schema(), table.spec(), CONF, - tablePath, + table.location(), FileFormat.PARQUET.addExtension(filename), rows); } private DeleteFile writeEqDeleteFile(String filename, List deletes) throws IOException { return SimpleDataUtil.writeEqDeleteFile( - table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); + table, FileFormat.PARQUET, filename, appenderFactory, deletes); } private DeleteFile writePosDeleteFile(String filename, List> positions) throws IOException { return SimpleDataUtil.writePosDeleteFile( - table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions); + table, FileFormat.PARQUET, filename, appenderFactory, positions); } private List generateDataFiles(int fileNum) throws IOException { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index c4f93f0ec231..66baaeb0e998 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -73,7 +73,6 @@ public class TestIcebergFilesCommitter extends TableTestBase { private static final Configuration CONF = new Configuration(); - private String tablePath; private File flinkManifestFolder; private final FileFormat format; @@ -104,8 +103,6 @@ public void setupTable() throws IOException { this.metadataDir = new File(tableDir, "metadata"); Assert.assertTrue(tableDir.delete()); - tablePath = tableDir.getAbsolutePath(); - // Construct the iceberg table. table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); @@ -881,8 +878,7 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { private DeleteFile writeEqDeleteFile( FileAppenderFactory appenderFactory, String filename, List deletes) throws IOException { - return SimpleDataUtil.writeEqDeleteFile( - table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); + return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes); } private DeleteFile writePosDeleteFile( @@ -890,8 +886,7 @@ private DeleteFile writePosDeleteFile( String filename, List> positions) throws IOException { - return SimpleDataUtil.writePosDeleteFile( - table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions); + return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions); } private FileAppenderFactory createDeletableAppenderFactory() { @@ -943,7 +938,13 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); + table, + table.schema(), + table.spec(), + CONF, + table.location(), + format.addExtension(filename), + rows); } private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) { @@ -961,7 +962,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) { private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) throws Exception { - TestOperatorFactory factory = TestOperatorFactory.of(tablePath); + TestOperatorFactory factory = TestOperatorFactory.of(table.location()); return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index bd959bfb31c4..06942b8ebefb 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -68,7 +68,6 @@ public class TestIcebergStreamWriter { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - private String tablePath; private Table table; private final FileFormat format; @@ -94,11 +93,10 @@ public TestIcebergStreamWriter(String format, boolean partitioned) { @Before public void before() throws IOException { File folder = tempFolder.newFolder(); - tablePath = folder.getAbsolutePath(); // Construct the iceberg table. Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); - table = SimpleDataUtil.createTable(tablePath, props, partitioned); + table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned); } @Test @@ -136,7 +134,7 @@ public void testWritingTable() throws Exception { // Assert the table records. SimpleDataUtil.assertTableRecords( - tablePath, + table, Lists.newArrayList( SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"), @@ -192,7 +190,7 @@ public void testTableWithoutSnapshot() throws Exception { } private Set scanDataFiles() throws IOException { - Path dataDir = new Path(tablePath, "data"); + Path dataDir = new Path(table.location(), "data"); FileSystem fs = FileSystem.get(new Configuration()); if (!fs.exists(dataDir)) { return ImmutableSet.of(); @@ -302,7 +300,7 @@ public void testTableWithTargetFileSize() throws Exception { } // Assert the table records. - SimpleDataUtil.assertTableRecords(tablePath, records); + SimpleDataUtil.assertTableRecords(table, records); } @Test diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index c56a348e7445..e428d5a19ab1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -69,7 +69,6 @@ public static Object[][] parameters() { private final FileFormat format; private final boolean partitioned; - private String path; private Table table; public TestTaskWriters(String format, boolean partitioned) { @@ -80,11 +79,10 @@ public TestTaskWriters(String format, boolean partitioned) { @Before public void before() throws IOException { File folder = tempFolder.newFolder(); - path = folder.getAbsolutePath(); // Construct the iceberg table with the specified file format. Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); - table = SimpleDataUtil.createTable(path, props, partitioned); + table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned); } @Test @@ -170,7 +168,7 @@ public void testCompleteFiles() throws IOException { // Assert the data rows. SimpleDataUtil.assertTableRecords( - path, + table, Lists.newArrayList( SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "b"), @@ -205,7 +203,7 @@ public void testRollingWithTargetFileSize() throws IOException { appendFiles.commit(); // Assert the data rows. - SimpleDataUtil.assertTableRecords(path, records); + SimpleDataUtil.assertTableRecords(table, records); } } @@ -226,7 +224,7 @@ public void testRandomData() throws IOException { appendFiles.commit(); // Assert the data rows. - SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows)); + SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows)); } }