Skip to content

Commit

Permalink
Spark 3.5: Fix flaky test due to temp directory not empty during dele…
Browse files Browse the repository at this point in the history
…te (apache#11470)
  • Loading branch information
manuzhang authored Nov 5, 2024
1 parent 592b3b1 commit 20e0e3d
Showing 1 changed file with 23 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
Expand Down Expand Up @@ -76,6 +74,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestDataFrameWrites extends ParameterizedAvroDataTest {
Expand All @@ -88,6 +87,8 @@ public static Collection<String> parameters() {

@Parameter private String format;

@TempDir private File location;

private static SparkSession spark = null;
private static JavaSparkContext sc = null;

Expand Down Expand Up @@ -140,47 +141,37 @@ public static void stopSpark() {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
File location = createTableFolder();
Table table = createTable(schema, location);
writeAndValidateWithLocations(table, location, new File(location, "data"));
Table table = createTable(schema);
writeAndValidateWithLocations(table, new File(location, "data"));
}

@TestTemplate
public void testWriteWithCustomDataLocation() throws IOException {
File location = createTableFolder();
File tablePropertyDataLocation = temp.resolve("test-table-property-data-dir").toFile();
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()));
table
.updateProperties()
.set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath())
.commit();
writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
}

private File createTableFolder() throws IOException {
File parent = temp.resolve("parquet").toFile();
File location = new File(parent, "test");
assertThat(location.mkdirs()).as("Mkdir should succeed").isTrue();
return location;
writeAndValidateWithLocations(table, tablePropertyDataLocation);
}

private Table createTable(Schema schema, File location) {
private Table createTable(Schema schema) {
HadoopTables tables = new HadoopTables(CONF);
return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
}

private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir)
throws IOException {
private void writeAndValidateWithLocations(Table table, File expectedDataDir) throws IOException {
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned

table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

Iterable<Record> expected = RandomData.generate(tableSchema, 100, 0L);
writeData(expected, tableSchema, location.toString());
writeData(expected, tableSchema);

table.refresh();

List<Row> actual = readTable(location.toString());
List<Row> actual = readTable();

Iterator<Record> expectedIter = expected.iterator();
Iterator<Row> actualIter = actual.iterator();
Expand All @@ -204,21 +195,20 @@ private void writeAndValidateWithLocations(Table table, File location, File expe
.startsWith(expectedDataDir.getAbsolutePath()));
}

private List<Row> readTable(String location) {
Dataset<Row> result = spark.read().format("iceberg").load(location);
private List<Row> readTable() {
Dataset<Row> result = spark.read().format("iceberg").load(location.toString());

return result.collectAsList();
}

private void writeData(Iterable<Record> records, Schema schema, String location)
throws IOException {
private void writeData(Iterable<Record> records, Schema schema) throws IOException {
Dataset<Row> df = createDataset(records, schema);
DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");
writer.save(location);
writer.save(location.toString());
}

private void writeDataWithFailOnPartition(
Iterable<Record> records, Schema schema, String location) throws IOException, SparkException {
private void writeDataWithFailOnPartition(Iterable<Record> records, Schema schema)
throws IOException, SparkException {
final int numPartitions = 10;
final int partitionToFail = new Random().nextInt(numPartitions);
MapPartitionsFunction<Row, Row> failOnFirstPartitionFunc =
Expand All @@ -241,7 +231,7 @@ private void writeDataWithFailOnPartition(
// Setting "check-nullability" option to "false" doesn't help as it fails at Spark analyzer.
Dataset<Row> convertedDf = df.sqlContext().createDataFrame(df.rdd(), convert(schema));
DataFrameWriter<?> writer = convertedDf.write().format("iceberg").mode("append");
writer.save(location);
writer.save(location.toString());
}

private Dataset<Row> createDataset(Iterable<Record> records, Schema schema) throws IOException {
Expand Down Expand Up @@ -287,7 +277,6 @@ public void testNullableWithWriteOption() throws IOException {
.as("Spark 3 rejects writing nulls to a required column")
.startsWith("2");

File location = temp.resolve("parquet").resolve("test").toFile();
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location);
String targetPath = String.format("%s/nullable_poc/targetFolder/", location);

Expand Down Expand Up @@ -341,7 +330,6 @@ public void testNullableWithSparkSqlOption() throws IOException {
.as("Spark 3 rejects writing nulls to a required column")
.startsWith("2");

File location = temp.resolve("parquet").resolve("test").toFile();
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location);
String targetPath = String.format("%s/nullable_poc/targetFolder/", location);

Expand Down Expand Up @@ -397,37 +385,28 @@ public void testNullableWithSparkSqlOption() throws IOException {

@TestTemplate
public void testFaultToleranceOnWrite() throws IOException {
File location = createTableFolder();
Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
Table table = createTable(schema, location);
Table table = createTable(schema);

Iterable<Record> records = RandomData.generate(schema, 100, 0L);
writeData(records, schema, location.toString());
writeData(records, schema);

table.refresh();

Snapshot snapshotBeforeFailingWrite = table.currentSnapshot();
List<Row> resultBeforeFailingWrite = readTable(location.toString());
List<Row> resultBeforeFailingWrite = readTable();

Iterable<Record> records2 = RandomData.generate(schema, 100, 0L);

assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema, location.toString()))
assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema))
.isInstanceOf(SparkException.class);

table.refresh();

Snapshot snapshotAfterFailingWrite = table.currentSnapshot();
List<Row> resultAfterFailingWrite = readTable(location.toString());
List<Row> resultAfterFailingWrite = readTable();

assertThat(snapshotBeforeFailingWrite).isEqualTo(snapshotAfterFailingWrite);
assertThat(resultBeforeFailingWrite).isEqualTo(resultAfterFailingWrite);

while (location.exists()) {
try {
FileUtils.deleteDirectory(location);
} catch (NoSuchFileException e) {
// ignore NoSuchFileException when a file is already deleted
}
}
}
}

0 comments on commit 20e0e3d

Please sign in to comment.