Skip to content

Commit

Permalink
Data, Flink, Spark: Migrate TestAppenderFactory and subclasses to JUn…
Browse files Browse the repository at this point in the history
…it5 (apache#9862)
  • Loading branch information
nk1506 authored Mar 11, 2024
1 parent 5ce5c78 commit 8b8907e
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@

public class TestGenericAppenderFactory extends TestAppenderFactory<Record> {

private final GenericRecord gRecord;

public TestGenericAppenderFactory(String fileFormat, boolean partitioned) {
super(fileFormat, partitioned);
this.gRecord = GenericRecord.create(SCHEMA);
}
private final GenericRecord gRecord = GenericRecord.create(SCHEMA);

@Override
protected FileAppenderFactory<Record> createAppenderFactory(
Expand Down
111 changes: 59 additions & 52 deletions data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
*/
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
Expand All @@ -46,45 +53,39 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public abstract class TestAppenderFactory<T> extends TableTestBase {
private static final int FORMAT_V2 = 2;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

private final FileFormat format;
private final boolean partitioned;
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestAppenderFactory<T> extends TestBase {
private static final int FORMAT_V2 = 2;

private PartitionKey partition = null;
private OutputFileFactory fileFactory = null;

@Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {"avro", false},
new Object[] {"avro", true},
new Object[] {"orc", false},
new Object[] {"orc", true},
new Object[] {"parquet", false},
new Object[] {"parquet", true}
};
}

public TestAppenderFactory(String fileFormat, boolean partitioned) {
super(FORMAT_V2);
this.format = FileFormat.fromString(fileFormat);
this.partitioned = partitioned;
@Parameter(index = 1)
protected FileFormat format;

@Parameter(index = 2)
private boolean partitioned;

@Parameters(name = "formatVersion = {0}, FileFormat={1}, partitioned={2}")
protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {FORMAT_V2, FileFormat.AVRO, false},
new Object[] {FORMAT_V2, FileFormat.AVRO, true},
new Object[] {FORMAT_V2, FileFormat.ORC, false},
new Object[] {FORMAT_V2, FileFormat.ORC, true},
new Object[] {FORMAT_V2, FileFormat.PARQUET, false},
new Object[] {FORMAT_V2, FileFormat.PARQUET, true});
}

@Override
@Before
@BeforeEach
public void setupTable() throws Exception {
this.tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete()); // created by table create
this.tableDir = Files.createTempDirectory(temp, "junit").toFile();
assertThat(tableDir.delete()).isTrue(); // created by table create

this.metadataDir = new File(tableDir, "metadata");

Expand Down Expand Up @@ -157,7 +158,7 @@ private DataFile prepareDataFile(List<T> rowSet, FileAppenderFactory<T> appender
return writer.toDataFile();
}

@Test
@TestTemplate
public void testDataWriter() throws IOException {
FileAppenderFactory<T> appenderFactory = createAppenderFactory(null, null, null);

Expand All @@ -166,11 +167,12 @@ public void testDataWriter() throws IOException {

table.newRowDelta().addRows(dataFile).commit();

Assert.assertEquals(
"Should have the expected records.", expectedRowSet(rowSet), actualRowSet("*"));
assertThat(actualRowSet("*"))
.as("Should have the expected records.")
.isEqualTo(expectedRowSet(rowSet));
}

@Test
@TestTemplate
public void testEqDeleteWriter() throws IOException {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId());
Schema eqDeleteRowSchema = table.schema().select("id");
Expand Down Expand Up @@ -198,18 +200,20 @@ public void testEqDeleteWriter() throws IOException {
GenericRecord gRecord = GenericRecord.create(eqDeleteRowSchema);
Set<Record> expectedDeletes =
Sets.newHashSet(gRecord.copy("id", 1), gRecord.copy("id", 3), gRecord.copy("id", 5));
Assert.assertEquals(
expectedDeletes,
Sets.newHashSet(createReader(eqDeleteRowSchema, out.encryptingOutputFile().toInputFile())));
assertThat(
Sets.newHashSet(
createReader(eqDeleteRowSchema, out.encryptingOutputFile().toInputFile())))
.isEqualTo(expectedDeletes);

table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit();

List<T> expected = Lists.newArrayList(createRow(2, "bbb"), createRow(4, "ddd"));
Assert.assertEquals(
"Should have the expected records", expectedRowSet(expected), actualRowSet("*"));
assertThat(actualRowSet("*"))
.as("Should have the expected records")
.isEqualTo(expectedRowSet(expected));
}

@Test
@TestTemplate
public void testPosDeleteWriter() throws IOException {
// Initialize FileAppenderFactory without pos-delete row schema.
FileAppenderFactory<T> appenderFactory = createAppenderFactory(null, null, null);
Expand Down Expand Up @@ -241,9 +245,9 @@ public void testPosDeleteWriter() throws IOException {
gRecord.copy("file_path", dataFile.path(), "pos", 0L),
gRecord.copy("file_path", dataFile.path(), "pos", 2L),
gRecord.copy("file_path", dataFile.path(), "pos", 4L));
Assert.assertEquals(
expectedDeletes,
Sets.newHashSet(createReader(pathPosSchema, out.encryptingOutputFile().toInputFile())));
assertThat(
Sets.newHashSet(createReader(pathPosSchema, out.encryptingOutputFile().toInputFile())))
.isEqualTo(expectedDeletes);

table
.newRowDelta()
Expand All @@ -254,11 +258,12 @@ public void testPosDeleteWriter() throws IOException {
.commit();

List<T> expected = Lists.newArrayList(createRow(2, "bbb"), createRow(4, "ddd"));
Assert.assertEquals(
"Should have the expected records", expectedRowSet(expected), actualRowSet("*"));
assertThat(actualRowSet("*"))
.as("Should have the expected records")
.isEqualTo(expectedRowSet(expected));
}

@Test
@TestTemplate
public void testPosDeleteWriterWithRowSchema() throws IOException {
FileAppenderFactory<T> appenderFactory = createAppenderFactory(null, null, table.schema());

Expand Down Expand Up @@ -308,9 +313,10 @@ public void testPosDeleteWriterWithRowSchema() throws IOException {
4L,
"row",
rowRecord.copy("id", 5, "data", "eee")));
Assert.assertEquals(
expectedDeletes,
Sets.newHashSet(createReader(pathPosRowSchema, out.encryptingOutputFile().toInputFile())));
assertThat(
Sets.newHashSet(
createReader(pathPosRowSchema, out.encryptingOutputFile().toInputFile())))
.isEqualTo(expectedDeletes);

table
.newRowDelta()
Expand All @@ -321,8 +327,9 @@ public void testPosDeleteWriterWithRowSchema() throws IOException {
.commit();

List<T> expected = Lists.newArrayList(createRow(2, "bbb"), createRow(4, "ddd"));
Assert.assertEquals(
"Should have the expected records", expectedRowSet(expected), actualRowSet("*"));
assertThat(actualRowSet("*"))
.as("Should have the expected records")
.isEqualTo(expectedRowSet(expected));
}

private CloseableIterable<Record> createReader(Schema schema, InputFile inputFile) {
Expand Down
74 changes: 39 additions & 35 deletions data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -27,46 +29,47 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestBaseTaskWriter extends TableTestBase {
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestBaseTaskWriter extends TestBase {
private static final int FORMAT_V2 = 2;

private final FileFormat format;
private final GenericRecord gRecord = GenericRecord.create(SCHEMA);

private OutputFileFactory fileFactory = null;
private FileAppenderFactory<Record> appenderFactory = null;

@Parameterized.Parameters(name = "FileFormat = {0}")
public static Object[][] parameters() {
return new Object[][] {{"avro"}, {"orc"}, {"parquet"}};
}
@Parameter(index = 1)
protected FileFormat format;

public TestBaseTaskWriter(String fileFormat) {
super(FORMAT_V2);
this.format = FileFormat.fromString(fileFormat);
@Parameters(name = "formatVersion = {0}, FileFormat = {1}")
protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {FORMAT_V2, FileFormat.AVRO},
new Object[] {FORMAT_V2, FileFormat.ORC},
new Object[] {FORMAT_V2, FileFormat.PARQUET});
}

@Override
@Before
@BeforeEach
public void setupTable() throws IOException {
this.tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete()); // created by table create
this.tableDir = Files.createTempDirectory(temp, "junit").toFile();
assertThat(tableDir.delete()).isTrue(); // created by table create

this.metadataDir = new File(tableDir, "metadata");

Expand All @@ -90,23 +93,23 @@ private Record createRecord(Integer id, String data) {
return gRecord.copy("id", id, "data", data);
}

@Test
@TestTemplate
public void testWriteZeroRecord() throws IOException {
try (TestTaskWriter writer = createTaskWriter(128 * 1024 * 1024)) {
writer.close();

WriteResult result = writer.complete();
Assert.assertEquals(0, result.dataFiles().length);
Assert.assertEquals(0, result.deleteFiles().length);
assertThat(result.dataFiles()).hasSize(0);
assertThat(result.deleteFiles()).hasSize(0);

writer.close();
result = writer.complete();
Assert.assertEquals(0, result.dataFiles().length);
Assert.assertEquals(0, result.deleteFiles().length);
assertThat(result.dataFiles()).hasSize(0);
assertThat(result.deleteFiles()).hasSize(0);
}
}

@Test
@TestTemplate
public void testAbort() throws IOException {
List<Record> records = Lists.newArrayList();
for (int i = 0; i < 2000; i++) {
Expand All @@ -128,18 +131,18 @@ public void testAbort() throws IOException {
Files.list(Paths.get(tableDir.getPath(), "data"))
.filter(p -> !p.toString().endsWith(".crc"))
.collect(Collectors.toList());
Assert.assertEquals("Should have 4 files but the files are: " + files, 4, files.size());
assertThat(files).as("Should have 4 files but the files are: " + files).hasSize(4);

// Abort to clean all delete files and data files.
taskWriter.abort();
}

for (Path path : files) {
Assert.assertFalse(Files.exists(path));
assertThat(path).doesNotExist();
}
}

@Test
@TestTemplate
public void testRollIfExceedTargetFileSize() throws IOException {
List<Record> records = Lists.newArrayListWithCapacity(8000);
for (int i = 0; i < 2000; i++) {
Expand All @@ -156,8 +159,8 @@ public void testRollIfExceedTargetFileSize() throws IOException {
}

result = taskWriter.complete();
Assert.assertEquals(8, result.dataFiles().length);
Assert.assertEquals(0, result.deleteFiles().length);
assertThat(result.dataFiles()).hasSize(8);
assertThat(result.deleteFiles()).hasSize(0);
}

RowDelta rowDelta = table.newRowDelta();
Expand All @@ -178,17 +181,18 @@ public void testRollIfExceedTargetFileSize() throws IOException {
}

result = taskWriter.complete();
Assert.assertEquals(8, result.dataFiles().length);
Assert.assertEquals(8, result.deleteFiles().length);
assertThat(result.dataFiles()).hasSize(8);
assertThat(result.deleteFiles()).hasSize(8);
}

rowDelta = table.newRowDelta();
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
rowDelta.commit();

Assert.assertEquals(
"Should have expected records", expectedRowSet(expected), actualRowSet("*"));
assertThat(actualRowSet("*"))
.as("Should have expected records")
.isEqualTo(expectedRowSet(expected));
}

private StructLikeSet expectedRowSet(Iterable<Record> records) {
Expand Down
Loading

0 comments on commit 8b8907e

Please sign in to comment.