Skip to content

Commit

Permalink
Spark : Support parallelism in RemoveOrphanFiles (apache#3872)
Browse files Browse the repository at this point in the history
Co-authored-by: Steve Zhang <[email protected]>
  • Loading branch information
dramaticlly and Steve Zhang authored Jan 14, 2022
1 parent 1a6a60d commit 7249d67
Show file tree
Hide file tree
Showing 15 changed files with 653 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.actions;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -67,6 +68,18 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles deleteWith(Consumer<String> deleteFunc);

/**
* Passes an alternative executor service that will be used for removing orphaned files.
* <p>
* If this method is not called, orphaned manifests and data files will still be deleted in
* the current thread.
* <p>
*
* @param executorService the service to use
* @return this for method chaining
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);

/**
* The action result that contains a summary of the execution.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class BaseDeleteOrphanFilesSparkAction
}
}, DataTypes.StringType);

private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
Expand All @@ -99,6 +102,8 @@ public void accept(String file) {
}
};

private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;

public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);

Expand All @@ -117,6 +122,12 @@ protected DeleteOrphanFiles self() {
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
Expand Down Expand Up @@ -167,6 +178,7 @@ private DeleteOrphanFiles.Result doExecute() {

Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -46,6 +51,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -239,6 +245,76 @@ public void testAllValidFilesAreKept() throws IOException, InterruptedException
}
}

@Test
public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, IOException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation);

List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df1 = spark.createDataFrame(records1, ThreeColumnRecord.class).coalesce(1);

// original append
df1.select("c1", "c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df2 = spark.createDataFrame(records2, ThreeColumnRecord.class).coalesce(1);

// dynamic partition overwrite
df2.select("c1", "c2", "c3")
.write()
.format("iceberg")
.mode("overwrite")
.save(tableLocation);

// second append
df2.select("c1", "c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA");
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid");

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);

Set<String> deletedFiles = Sets.newHashSet();
Set<String> deleteThreads = ConcurrentHashMap.newKeySet();
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);

ExecutorService executorService = Executors.newFixedThreadPool(4, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("remove-orphan-" + deleteThreadsIndex.getAndIncrement());
thread.setDaemon(true);
return thread;
});

DeleteOrphanFiles.Result result = SparkActions.get().deleteOrphanFiles(table)
.executeDeleteWith(executorService)
.olderThan(System.currentTimeMillis())
.deleteWith(file -> {
deleteThreads.add(Thread.currentThread().getName());
deletedFiles.add(file);
})
.execute();

// Verifies that the delete methods ran in the threads created by the provided ExecutorService ThreadFactory
Assert.assertEquals(deleteThreads,
Sets.newHashSet("remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3"));

Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
}

@Test
public void testWapFilesAreKept() throws InterruptedException {
Map<String, String> props = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,75 @@ public void testInvalidRemoveOrphanFilesCases() {
IllegalArgumentException.class, "Cannot handle an empty identifier",
() -> sql("CALL %s.system.remove_orphan_files('')", catalogName));
}

@Test
public void testConcurrentRemoveOrphanFiles() throws IOException {
if (catalogName.equals("testhadoop")) {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
} else {
// give a fresh location to Hive tables as Spark will not clean up the table location
// correctly while dropping tables through spark_catalog
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'",
tableName, temp.newFolder());
}

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

Table table = validationCatalog.loadTable(tableIdent);

String metadataLocation = table.location() + "/metadata";
String dataLocation = table.location() + "/data";

// produce orphan files in the data location using parquet
sql("CREATE TABLE p (id bigint) USING parquet LOCATION '%s'", dataLocation);
sql("INSERT INTO TABLE p VALUES (1)");
sql("INSERT INTO TABLE p VALUES (10)");
sql("INSERT INTO TABLE p VALUES (100)");
sql("INSERT INTO TABLE p VALUES (1000)");

// wait to ensure files are old enough
waitUntilAfter(System.currentTimeMillis());

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));

// check for orphans in the table location
List<Object[]> output = sql(
"CALL %s.system.remove_orphan_files(" +
"table => '%s'," +
"max_concurrent_deletes => %s," +
"older_than => TIMESTAMP '%s')",
catalogName, tableIdent, 4, currentTimestamp);
Assert.assertEquals("Should be orphan files in the data folder", 4, output.size());

// the previous call should have deleted all orphan files
List<Object[]> output3 = sql(
"CALL %s.system.remove_orphan_files(" +
"table => '%s'," +
"max_concurrent_deletes => %s," +
"older_than => TIMESTAMP '%s')",
catalogName, tableIdent, 4, currentTimestamp);
Assert.assertEquals("Should be no more orphan files in the data folder", 0, output3.size());

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, "a"), row(2L, "b")),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);

AssertHelpers.assertThrows("Should throw an error when max_concurrent_deletes = 0",
IllegalArgumentException.class, "max_concurrent_deletes should have value > 0",
() -> sql("CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
catalogName, tableIdent, 0));

AssertHelpers.assertThrows("Should throw an error when max_concurrent_deletes < 0 ",
IllegalArgumentException.class, "max_concurrent_deletes should have value > 0",
() -> sql(
"CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
catalogName, tableIdent, -1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class BaseDeleteOrphanFilesSparkAction
}
}, DataTypes.StringType);

private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
Expand All @@ -99,6 +102,8 @@ public void accept(String file) {
}
};

private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;

public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);

Expand All @@ -117,6 +122,12 @@ protected DeleteOrphanFiles self() {
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
Expand Down Expand Up @@ -167,6 +178,7 @@ private DeleteOrphanFiles.Result doExecute() {

Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@

package org.apache.iceberg.spark.procedures;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
Expand All @@ -47,7 +53,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType)
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
Expand Down Expand Up @@ -83,6 +90,10 @@ public InternalRow[] call(InternalRow args) {
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);

Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);

return withIcebergTable(tableIdent, table -> {
DeleteOrphanFiles action = actions().deleteOrphanFiles(table);
Expand All @@ -103,6 +114,10 @@ public InternalRow[] call(InternalRow args) {
action.deleteWith(file -> { });
}

if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) {
action.executeDeleteWith(removeService(maxConcurrentDeletes));
}

DeleteOrphanFiles.Result result = action.execute();

return toOutputRows(result);
Expand Down Expand Up @@ -136,6 +151,15 @@ private void validateInterval(long olderThanMillis) {
}
}

private ExecutorService removeService(int concurrentDeletes) {
return MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(
concurrentDeletes,
new ThreadFactoryBuilder()
.setNameFormat("remove-orphans-%d")
.build()));
}

@Override
public String description() {
return "RemoveOrphanFilesProcedure";
Expand Down
Loading

0 comments on commit 7249d67

Please sign in to comment.