Skip to content

Commit

Permalink
Flink: Backport: Improve unit tests for sink (apache#6603)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored Jan 16, 2023
1 parent 96a9125 commit 3714270
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,14 @@ public static DataFile writeFile(
public static DeleteFile writeEqDeleteFile(
Table table,
FileFormat format,
String tablePath,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<RowData> deletes)
throws IOException {
EncryptedOutputFile outputFile =
table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
table
.encryption()
.encrypt(fromPath(new Path(table.location(), filename), new Configuration()));

EqualityDeleteWriter<RowData> eqWriter =
appenderFactory.newEqDeleteWriter(outputFile, format, null);
Expand All @@ -175,13 +176,14 @@ public static DeleteFile writeEqDeleteFile(
public static DeleteFile writePosDeleteFile(
Table table,
FileFormat format,
String tablePath,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<Pair<CharSequence, Long>> positions)
throws IOException {
EncryptedOutputFile outputFile =
table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
table
.encryption()
.encrypt(fromPath(new Path(table.location(), filename), new Configuration()));

PositionDeleteWriter<RowData> posWriter =
appenderFactory.newPosDeleteWriter(outputFile, format, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TestFlinkManifest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

private String tablePath;
private Table table;
private FileAppenderFactory<RowData> appenderFactory;
private final AtomicInteger fileCount = new AtomicInteger(0);
Expand All @@ -67,7 +66,7 @@ public void before() throws IOException {
File folder = tempFolder.newFolder();
String warehouse = folder.getAbsolutePath();

tablePath = warehouse.concat("/test");
String tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());

// Construct the iceberg table.
Expand Down Expand Up @@ -260,20 +259,20 @@ private DataFile writeDataFile(String filename, List<RowData> rows) throws IOExc
table.schema(),
table.spec(),
CONF,
tablePath,
table.location(),
FileFormat.PARQUET.addExtension(filename),
rows);
}

private DeleteFile writeEqDeleteFile(String filename, List<RowData> deletes) throws IOException {
return SimpleDataUtil.writeEqDeleteFile(
table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
table, FileFormat.PARQUET, filename, appenderFactory, deletes);
}

private DeleteFile writePosDeleteFile(String filename, List<Pair<CharSequence, Long>> positions)
throws IOException {
return SimpleDataUtil.writePosDeleteFile(
table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
table, FileFormat.PARQUET, filename, appenderFactory, positions);
}

private List<DataFile> generateDataFiles(int fileNum) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
public class TestIcebergFilesCommitter extends TableTestBase {
private static final Configuration CONF = new Configuration();

private String tablePath;
private File flinkManifestFolder;

private final FileFormat format;
Expand Down Expand Up @@ -104,8 +103,6 @@ public void setupTable() throws IOException {
this.metadataDir = new File(tableDir, "metadata");
Assert.assertTrue(tableDir.delete());

tablePath = tableDir.getAbsolutePath();

// Construct the iceberg table.
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());

Expand Down Expand Up @@ -881,17 +878,15 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
private DeleteFile writeEqDeleteFile(
FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes)
throws IOException {
return SimpleDataUtil.writeEqDeleteFile(
table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes);
}

private DeleteFile writePosDeleteFile(
FileAppenderFactory<RowData> appenderFactory,
String filename,
List<Pair<CharSequence, Long>> positions)
throws IOException {
return SimpleDataUtil.writePosDeleteFile(
table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions);
}

private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
Expand Down Expand Up @@ -943,7 +938,13 @@ private List<Path> assertFlinkManifests(int expectedCount) throws IOException {

private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
table,
table.schema(),
table.spec(),
CONF,
table.location(),
format.addExtension(filename),
rows);
}

private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
Expand All @@ -961,7 +962,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) {

private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
TestOperatorFactory factory = TestOperatorFactory.of(table.location());
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
public class TestIcebergStreamWriter {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

private String tablePath;
private Table table;

private final FileFormat format;
Expand All @@ -94,11 +93,10 @@ public TestIcebergStreamWriter(String format, boolean partitioned) {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
tablePath = folder.getAbsolutePath();

// Construct the iceberg table.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
table = SimpleDataUtil.createTable(tablePath, props, partitioned);
table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}

@Test
Expand Down Expand Up @@ -136,7 +134,7 @@ public void testWritingTable() throws Exception {

// Assert the table records.
SimpleDataUtil.assertTableRecords(
tablePath,
table,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world"),
Expand Down Expand Up @@ -192,7 +190,7 @@ public void testTableWithoutSnapshot() throws Exception {
}

private Set<String> scanDataFiles() throws IOException {
Path dataDir = new Path(tablePath, "data");
Path dataDir = new Path(table.location(), "data");
FileSystem fs = FileSystem.get(new Configuration());
if (!fs.exists(dataDir)) {
return ImmutableSet.of();
Expand Down Expand Up @@ -302,7 +300,7 @@ public void testTableWithTargetFileSize() throws Exception {
}

// Assert the table records.
SimpleDataUtil.assertTableRecords(tablePath, records);
SimpleDataUtil.assertTableRecords(table, records);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public static Object[][] parameters() {
private final FileFormat format;
private final boolean partitioned;

private String path;
private Table table;

public TestTaskWriters(String format, boolean partitioned) {
Expand All @@ -80,11 +79,10 @@ public TestTaskWriters(String format, boolean partitioned) {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
path = folder.getAbsolutePath();

// Construct the iceberg table with the specified file format.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
table = SimpleDataUtil.createTable(path, props, partitioned);
table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}

@Test
Expand Down Expand Up @@ -170,7 +168,7 @@ public void testCompleteFiles() throws IOException {

// Assert the data rows.
SimpleDataUtil.assertTableRecords(
path,
table,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "b"),
Expand Down Expand Up @@ -205,7 +203,7 @@ public void testRollingWithTargetFileSize() throws IOException {
appendFiles.commit();

// Assert the data rows.
SimpleDataUtil.assertTableRecords(path, records);
SimpleDataUtil.assertTableRecords(table, records);
}
}

Expand All @@ -226,7 +224,7 @@ public void testRandomData() throws IOException {
appendFiles.commit();

// Assert the data rows.
SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows));
SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows));
}
}

Expand Down

0 comments on commit 3714270

Please sign in to comment.