Skip to content

Commit

Permalink
Spark: Add RemoveReachableFiles action (apache#2415)
Browse files Browse the repository at this point in the history
  • Loading branch information
karuppayya authored Jun 9, 2021
1 parent 282b6f9 commit efaad97
Show file tree
Hide file tree
Showing 8 changed files with 746 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,11 @@ default RewriteDataFiles rewriteDataFiles(Table table) {
default ExpireSnapshots expireSnapshots(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement expireSnapshots");
}

/**
* Instantiates an action to remove all the files reachable from given metadata location.
*/
default RemoveReachableFiles removeReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement removeReachableFiles");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.iceberg.io.FileIO;

/**
* An action that removes all files referenced by a table metadata file.
* <p>
* This action will irreversibly delete all reachable files such as data files, manifests,
* manifest lists and should be used to clean up the underlying storage once a table is dropped
* and no longer needed.
* <p>
* Implementations may use a query engine to distribute parts of work.
*/
public interface RemoveReachableFiles extends Action<RemoveReachableFiles, RemoveReachableFiles.Result> {

/**
* Passes an alternative delete implementation that will be used for files.
*
* @param removeFunc a function that will be called to delete files.
* The function accepts path to file as an argument.
* @return this for method chaining
*/
RemoveReachableFiles deleteWith(Consumer<String> removeFunc);

/**
* Passes an alternative executor service that will be used for files removal.
* <p>
* If this method is not called, files will be deleted in the current thread.
*
* @param executorService the service to use
* @return this for method chaining
*/
RemoveReachableFiles executeDeleteWith(ExecutorService executorService);

/**
* Set the {@link FileIO} to be used for files removal
*
* @param io FileIO to use for files removal
* @return this for method chaining
*/
RemoveReachableFiles io(FileIO io);

/**
* The action result that contains a summary of the execution.
*/
interface Result {

/**
* Returns the number of data files removed.
*/
long removedDataFilesCount();

/**
* Returns the number of manifests removed.
*/
long removedManifestsCount();

/**
* Returns the number of manifest lists removed.
*/
long removedManifestListsCount();

/**
* Returns the number of metadata json, version hint files removed.
*/
long otherRemovedFilesCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseRemoveFilesActionResult implements RemoveReachableFiles.Result {

private final long deletedDataFilesCount;
private final long deletedManifestsCount;
private final long deletedManifestListsCount;
private final long deletedOtherFilesCount;

public BaseRemoveFilesActionResult(long deletedDataFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount,
long otherDeletedFilesCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
this.deletedOtherFilesCount = otherDeletedFilesCount;
}

@Override
public long removedDataFilesCount() {
return deletedDataFilesCount;
}

@Override
public long removedManifestsCount() {
return deletedManifestsCount;
}

@Override
public long removedManifestListsCount() {
return deletedManifestListsCount;
}

@Override
public long otherRemovedFilesCount() {
return deletedOtherFilesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.actions;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.BaseRemoveFilesActionResult;
import org.apache.iceberg.actions.RemoveReachableFiles;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link RemoveReachableFiles} that uses metadata tables in Spark
* to determine which files should be deleted.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
public class BaseRemoveReachableFilesSparkAction
extends BaseSparkAction<RemoveReachableFiles, RemoveReachableFiles.Result> implements RemoveReachableFiles {
private static final Logger LOG = LoggerFactory.getLogger(BaseRemoveReachableFilesSparkAction.class);

private static final String DATA_FILE = "Data File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
private static final String OTHERS = "Others";

private static final String STREAM_RESULTS = "stream-results";

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final String metadataLocation;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
io.deleteFile(file);
}
};

private Consumer<String> removeFunc = defaultDelete;
private ExecutorService removeExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());

public BaseRemoveReachableFilesSparkAction(SparkSession spark, String metadataLocation) {
super(spark);
this.metadataLocation = metadataLocation;
}

@Override
protected RemoveReachableFiles self() {
return this;
}

@Override
public RemoveReachableFiles io(FileIO fileIO) {
this.io = fileIO;
return this;
}

@Override
public RemoveReachableFiles deleteWith(Consumer<String> removeFn) {
this.removeFunc = removeFn;
return this;

}

@Override
public RemoveReachableFiles executeDeleteWith(ExecutorService executorService) {
this.removeExecutorService = executorService;
return this;
}

@Override
public Result execute() {
Preconditions.checkArgument(io != null, "File IO cannot be null");
String msg = String.format("Removing files reachable from %s", metadataLocation);
JobGroupInfo info = newJobGroupInfo("REMOVE-FILES", msg);
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false);
TableMetadata metadata = TableMetadataParser.read(io, metadataLocation);
Dataset<Row> validFileDF = buildValidFileDF(metadata).distinct();
if (streamResults) {
return deleteFiles(validFileDF.toLocalIterator());
} else {
return deleteFiles(validFileDF.collectAsList().iterator());
}
}

private Dataset<Row> projectFilePathWithType(Dataset<Row> ds, String type) {
return ds.select(functions.col("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE)
.union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
.union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
}

@Override
protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true));
otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
}

/**
* Deletes files passed to it.
*
* @param deleted an Iterator of Spark Rows of the structure (path: String, type: String)
* @return Statistics on which files were deleted
*/
private BaseRemoveFilesActionResult deleteFiles(Iterator<Row> deleted) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);
AtomicLong otherFilesCount = new AtomicLong(0L);

Tasks.foreach(deleted)
.retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
.executeWith(removeExecutorService)
.onFailure((fileInfo, exc) -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
LOG.warn("Delete failed for {}: {}", type, file, exc);
})
.run(fileInfo -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
removeFunc.accept(file);
switch (type) {
case DATA_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
break;
case MANIFEST_LIST:
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
break;
case OTHERS:
otherFilesCount.incrementAndGet();
LOG.debug("Others: {}", file);
break;
}
});

long filesCount = dataFileCount.get() + manifestCount.get() + manifestListCount.get() + otherFilesCount.get();
LOG.info("Total files removed: {}", filesCount);
return new BaseRemoveFilesActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get(),
otherFilesCount.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RemoveOrphanFiles;
import org.apache.iceberg.actions.RemoveReachableFiles;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.spark.sql.SparkSession;

Expand Down Expand Up @@ -52,4 +53,9 @@ public RewriteManifests rewriteManifests(Table table) {
public ExpireSnapshots expireSnapshots(Table table) {
return new BaseExpireSnapshotsSparkAction(spark, table);
}

@Override
public RemoveReachableFiles removeReachableFiles(String metadataLocation) {
return new BaseRemoveReachableFilesSparkAction(spark, metadataLocation);
}
}
Loading

0 comments on commit efaad97

Please sign in to comment.