Skip to content

Commit

Permalink
Support retaining last N snapshots (apache#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
yathindranath authored and rdblue committed Dec 27, 2019
1 parent 79e88c6 commit 44a0a83
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 1 deletion.
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
*/
ExpireSnapshots expireOlderThan(long timestampMillis);

/**
* Retains the most recent ancestors of the current snapshot.
* <p>
* If a snapshot would be expired becuase it is older than the expiration timestamp, but is one of
* the {@code numSnapshot} most recent ancestors of the current state, it will be retained. This
* will not cause snapshots explicitly identified by id from expiring.
* <p>
* This may keep more than {@code numSnapshot} ancestors if snapshots are added concurrently. This
* may keep less than {@numSnapshot} ancestors if the current table state does not have that many.
*
* @param numSnapshots the number of snapshots to retain
* @return this for method chaining
*/
ExpireSnapshots retainLast(int numSnapshots);

/**
* Passes an alternative delete implementation that will be used for manifests and data files.
* <p>
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.internal.ConcurrentSet;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void accept(String file) {

private final TableOperations ops;
private final Set<Long> idsToRemove = Sets.newHashSet();
private final Set<Long> idsToRetain = Sets.newHashSet();
private TableMetadata base;
private Long expireOlderThan = null;
private Consumer<String> deleteFunc = defaultDelete;
Expand All @@ -82,6 +84,21 @@ public ExpireSnapshots expireOlderThan(long timestampMillis) {
return this;
}

@Override
public ExpireSnapshots retainLast(int numSnapshots) {
Preconditions.checkArgument(1 <= numSnapshots,
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
idsToRetain.clear();
List<Long> ancestorIds = SnapshotUtil.ancestorIds(base.currentSnapshot(), base::snapshot);
if (numSnapshots >= ancestorIds.size()) {
idsToRetain.addAll(ancestorIds);
} else {
idsToRetain.addAll(ancestorIds.subList(0, numSnapshots));
}

return this;
}

@Override
public ExpireSnapshots deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
Expand All @@ -102,7 +119,8 @@ private TableMetadata internalApply() {

return base.removeSnapshotsIf(snapshot ->
idsToRemove.contains(snapshot.snapshotId()) ||
(expireOlderThan != null && snapshot.timestampMillis() < expireOlderThan));
(expireOlderThan != null && snapshot.timestampMillis() < expireOlderThan &&
!idsToRetain.contains(snapshot.snapshotId())));
}

@Override
Expand Down
Loading

0 comments on commit 44a0a83

Please sign in to comment.