Skip to content

Commit

Permalink
Spark: RemoveReachableFiles action should fail if GC is disabled (apa…
Browse files Browse the repository at this point in the history
…che#2763)

Co-authored-by: Karuppayya Rajendran <[email protected]>
  • Loading branch information
karuppayya and Karuppayya Rajendran authored Jul 2, 2021
1 parent 9786fd1 commit c92092f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.actions.BaseRemoveFilesActionResult;
import org.apache.iceberg.actions.RemoveReachableFiles;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -46,6 +47,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

/**
* An implementation of {@link RemoveReachableFiles} that uses metadata tables in Spark
* to determine which files should be deleted.
Expand All @@ -65,7 +69,8 @@ public class BaseRemoveReachableFilesSparkAction
// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final String metadataLocation;
private final TableMetadata tableMetadata;

private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
Expand All @@ -79,7 +84,10 @@ public void accept(String file) {

public BaseRemoveReachableFilesSparkAction(SparkSession spark, String metadataLocation) {
super(spark);
this.metadataLocation = metadataLocation;
this.tableMetadata = TableMetadataParser.read(io, metadataLocation);
ValidationException.check(
PropertyUtil.propertyAsBoolean(tableMetadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot remove files: GC is disabled (deleting files may corrupt other tables)");
}

@Override
Expand Down Expand Up @@ -109,15 +117,14 @@ public RemoveReachableFiles executeDeleteWith(ExecutorService executorService) {
@Override
public Result execute() {
Preconditions.checkArgument(io != null, "File IO cannot be null");
String msg = String.format("Removing files reachable from %s", metadataLocation);
String msg = String.format("Removing files reachable from %s", tableMetadata.metadataFileLocation());
JobGroupInfo info = newJobGroupInfo("REMOVE-FILES", msg);
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false);
TableMetadata metadata = TableMetadataParser.read(io, metadataLocation);
Dataset<Row> validFileDF = buildValidFileDF(metadata).distinct();
Dataset<Row> validFileDF = buildValidFileDF(tableMetadata).distinct();
if (streamResults) {
return deleteFiles(validFileDF.toLocalIterator());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -322,6 +323,17 @@ public void testEmptyIOThrowsException() {
baseRemoveFilesSparkAction::execute);
}

@Test
public void testRemoveFilesActionWhenGarabageCollectionDisabled() {
table.updateProperties()
.set(TableProperties.GC_ENABLED, "false")
.commit();

AssertHelpers.assertThrows("Should complain about removing files when GC is disabled",
ValidationException.class, "Cannot remove files: GC is disabled (deleting files may corrupt other tables)",
() -> sparkActions().removeReachableFiles(metadataLocation(table)));
}

private String metadataLocation(Table tbl) {
return ((HasTableOperations) tbl).operations().current().metadataFileLocation();
}
Expand Down

0 comments on commit c92092f

Please sign in to comment.