Skip to content

Commit

Permalink
[FLINK-34050][state] Introduce parameter protection for DeleteFilesIn…
Browse files Browse the repository at this point in the history
…Range
  • Loading branch information
ljz2051 authored and StefanRRichter committed Feb 27, 2024
1 parent 534ea31 commit b048a9c
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<td>MemorySize</td>
<td>The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is '25MB'. </td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.rescaling.use-delete-files-in-range</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.restore-overlap-fraction-threshold</h5></td>
<td style="word-wrap: break-word;">0.0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
Expand Down Expand Up @@ -184,6 +185,12 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
*/
private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale;

/**
* Whether to leverage deleteFilesInRange API to clean up useless rocksdb files during
* rescaling.
*/
private final TernaryBoolean rescalingUseDeleteFilesInRange;

/** Factory for Write Buffer Manager and Block Cache. */
private RocksDBMemoryFactory rocksDBMemoryFactory;
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -218,6 +225,7 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing
this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED;
this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
}

/**
Expand Down Expand Up @@ -323,6 +331,12 @@ private EmbeddedRocksDBStateBackend(
? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
: TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());

rescalingUseDeleteFilesInRange =
original.rescalingUseDeleteFilesInRange == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(
config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING))
: original.rescalingUseDeleteFilesInRange;

this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
}

Expand Down Expand Up @@ -496,7 +510,8 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
.setOverlapFractionThreshold(getOverlapFractionThreshold())
.setIncrementalRestoreAsyncCompactAfterRescale(
getIncrementalRestoreAsyncCompactAfterRescale())
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode());
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode())
.setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange());
return builder.build();
}

Expand Down Expand Up @@ -844,6 +859,11 @@ boolean getUseIngestDbRestoreMode() {
return useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue());
}

boolean isRescalingUseDeleteFilesInRange() {
return rescalingUseDeleteFilesInRange.getOrDefault(
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue());
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ public class RocksDBConfigurableOptions implements Serializable {
.withDescription(
"If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend.");

public static final ConfigOption<Boolean> USE_DELETE_FILES_IN_RANGE_DURING_RESCALING =
key("state.backend.rocksdb.rescaling.use-delete-files-in-range")
.booleanType()
.defaultValue(Boolean.FALSE)
.withDescription(
"If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;

import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;
Expand All @@ -33,7 +34,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -111,57 +112,75 @@ public int compareTo(@Nullable Score other) {
* @param targetKeyGroupRange the target key group range.
* @param currentKeyGroupRange the key group range of the db instance.
* @param keyGroupPrefixBytes Number of bytes required to prefix the key groups.
* @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files.
*/
public static void clipDBWithKeyGroupRange(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull KeyGroupRange targetKeyGroupRange,
@Nonnull KeyGroupRange currentKeyGroupRange,
@Nonnegative int keyGroupPrefixBytes)
@Nonnegative int keyGroupPrefixBytes,
boolean useDeleteFilesInRange)
throws RocksDBException {

final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
List<byte[]> deletedRanges = new ArrayList<>(4);

if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
deletedRanges.add(beginKeyGroupBytes);
deletedRanges.add(endKeyGroupBytes);
}

if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
deletedRanges.add(beginKeyGroupBytes);
deletedRanges.add(endKeyGroupBytes);
}

deleteRangeData(db, columnFamilyHandles, deletedRanges, useDeleteFilesInRange);
}

/**
* Delete the record falls into [beginKeyBytes, endKeyBytes) of the db.
* Delete the record that falls into the given deleteRanges of the db.
*
* @param db the target need to be clipped.
* @param columnFamilyHandles the column family need to be clipped.
* @param beginKeyBytes the begin key bytes
* @param endKeyBytes the end key bytes
* @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, ...). For each pair
* [from, to), the startKey ('from') is inclusive, the endKey ('to') is exclusive.
* @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files.
*/
private static void deleteRange(
private static void deleteRangeData(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
byte[] beginKeyBytes,
byte[] endKeyBytes)
List<byte[]> deleteRanges,
boolean useDeleteFilesInRange)
throws RocksDBException {
List<byte[]> deletedRange = Arrays.asList(beginKeyBytes, endKeyBytes);

Preconditions.checkArgument(deleteRanges.size() % 2 == 0);
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
// Using RocksDB's deleteRange will take advantage of delete
// tombstones, which mark the range as deleted.
//
// https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
db.deleteFilesInRanges(columnFamilyHandle, deletedRange, false);
// First delete the files in ranges
if (useDeleteFilesInRange) {
db.deleteFilesInRanges(columnFamilyHandle, deleteRanges, false);
}

// Then put range limiting tombstones in place.
for (int i = 0; i < deleteRanges.size() / 2; i++) {
// Using RocksDB's deleteRange will take advantage of delete
// tombstones, which mark the range as deleted.
//
// https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
db.deleteRange(
columnFamilyHandle, deleteRanges.get(i * 2), deleteRanges.get(i * 2 + 1));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -130,6 +131,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
private RocksDB injectedTestDB; // for testing
private boolean incrementalRestoreAsyncCompactAfterRescale =
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue();
private boolean rescalingUseDeleteFilesInRange =
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue();

private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue();
private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing
Expand Down Expand Up @@ -288,6 +292,12 @@ RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDb
return this;
}

RocksDBKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(
boolean rescalingUseDeleteFilesInRange) {
this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange;
return this;
}

public static File getInstanceRocksDBPath(File instanceBasePath) {
return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
}
Expand Down Expand Up @@ -508,7 +518,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
optionsContainer.getWriteBufferManagerCapacity(),
overlapFractionThreshold,
useIngestDbRestoreMode,
incrementalRestoreAsyncCompactAfterRescale);
incrementalRestoreAsyncCompactAfterRescale,
rescalingUseDeleteFilesInRange);
} else if (priorityQueueConfig.getPriorityQueueStateType()
== EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
return new RocksDBHeapTimersFullRestoreOperation<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper

private final boolean asyncCompactAfterRescale;

private final boolean useDeleteFilesInRange;

public RocksDBIncrementalRestoreOperation(
String operatorIdentifier,
KeyGroupRange keyGroupRange,
Expand All @@ -148,7 +150,8 @@ public RocksDBIncrementalRestoreOperation(
Long writeBufferManagerCapacity,
double overlapFractionThreshold,
boolean useIngestDbRestoreMode,
boolean asyncCompactAfterRescale) {
boolean asyncCompactAfterRescale,
boolean useDeleteFilesInRange) {
this.rocksHandle =
new RocksDBHandle(
kvStateInformation,
Expand Down Expand Up @@ -178,6 +181,7 @@ public RocksDBIncrementalRestoreOperation(
// this.asyncCompactAfterRescale = asyncCompactAfterRescale;
this.useIngestDbRestoreMode = false;
this.asyncCompactAfterRescale = false;
this.useDeleteFilesInRange = useDeleteFilesInRange;
}

/**
Expand Down Expand Up @@ -371,7 +375,8 @@ private void initBaseDBFromSingleStateHandle(IncrementalLocalKeyedStateHandle st
this.rocksHandle.getColumnFamilyHandles(),
keyGroupRange,
stateHandleKeyGroupRange,
keyGroupPrefixBytes);
keyGroupPrefixBytes,
useDeleteFilesInRange);
} catch (RocksDBException e) {
String errMsg = "Failed to clip DB after initialization.";
logger.error(errMsg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public void testClipDBWithKeyGroupRange() throws Exception {

testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(0, 1), new KeyGroupRange(2, 4), 1);

testClipDBWithKeyGroupRangeHelper(new KeyGroupRange(4, 5), new KeyGroupRange(2, 7), 1);

testClipDBWithKeyGroupRangeHelper(
new KeyGroupRange(Byte.MAX_VALUE - 15, Byte.MAX_VALUE),
new KeyGroupRange(Byte.MAX_VALUE - 10, Byte.MAX_VALUE),
Expand Down Expand Up @@ -179,7 +181,8 @@ private void testClipDBWithKeyGroupRangeHelper(
Collections.singletonList(columnFamilyHandle),
targetGroupRange,
currentGroupRange,
keyGroupPrefixBytes);
keyGroupPrefixBytes,
true);

for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
for (int j = 0; j < 100; ++j) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ private void rescaleAndRestoreBackends(
.setEnableIncrementalCheckpointing(true)
.setUseIngestDbRestoreMode(useIngest)
.setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale)
.setRescalingUseDeleteFilesInRange(true)
.build();

long instanceTime = System.currentTimeMillis() - tInstance;
Expand Down

0 comments on commit b048a9c

Please sign in to comment.