Skip to content

Commit

Permalink
[FLINK-31238] Deactivate parts of the code until new FRocksDB release…
Browse files Browse the repository at this point in the history
… is available.

Then this commit should be reverted.
  • Loading branch information
StefanRRichter committed Feb 13, 2024
1 parent 5495a96 commit 268a308
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 60 deletions.
4 changes: 2 additions & 2 deletions flink-state-backends/flink-statebackend-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ under the License.
</dependency>

<dependency>
<groupId>io.github.fredia</groupId>
<groupId>com.ververica</groupId>
<artifactId>frocksdbjni</artifactId>
<version>8.6.7-ververica-test-1.0</version>
<version>6.20.3-ververica-2.0</version>
</dependency>

<!-- test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;

import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;

import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
Expand All @@ -39,16 +33,11 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

/** Utils for RocksDB Incremental Checkpoint. */
public class RocksDBIncrementalCheckpointUtils {
Expand Down Expand Up @@ -218,6 +207,7 @@ public static Optional<RunnableWithException> createRangeCompactionTaskIfNeeded(

return Optional.of(
() -> {
/*
try (CompactRangeOptions compactionOptions =
new CompactRangeOptions()
.setExclusiveManualCompaction(true)
Expand Down Expand Up @@ -251,6 +241,7 @@ public static Optional<RunnableWithException> createRangeCompactionTaskIfNeeded(
}
}
}
*/
});
}

Expand Down Expand Up @@ -321,6 +312,7 @@ private static KeyRange getDBKeyRange(RocksDB db) {
* @param resultOutput output parameter for the metadata of the export.
* @throws RocksDBException on problems inside RocksDB.
*/
/*
public static void exportColumnFamilies(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
Expand Down Expand Up @@ -357,6 +349,7 @@ public static void exportColumnFamilies(
}
}
}
*/

/** check whether the bytes is before prefixBytes in the character order. */
public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.ImportColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
Expand All @@ -44,7 +42,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -120,17 +117,13 @@ public static void registerKvStateInformation(
*
* <p>Creates the column family for the state. Sets TTL compaction filter if {@code
* ttlCompactFiltersManager} is not {@code null}.
*
* @param importFilesMetaData if not empty, we import the files specified in the metadata to the
* column family.
*/
public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity,
List<ExportImportFilesMetaData> importFilesMetaData) {
@Nullable Long writeBufferManagerCapacity) {

ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(
Expand All @@ -141,8 +134,7 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(

final ColumnFamilyHandle columnFamilyHandle;
try {
columnFamilyHandle =
createColumnFamily(columnFamilyDescriptor, db, importFilesMetaData);
columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
Expand All @@ -151,21 +143,6 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
}

public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity) {
return createStateInfo(
metaInfoBase,
db,
columnFamilyOptionsFactory,
ttlCompactFiltersManager,
writeBufferManagerCapacity,
Collections.emptyList());
}

/**
* Creates a column descriptor for a state column family.
*
Expand Down Expand Up @@ -253,20 +230,8 @@ public static ColumnFamilyOptions createColumnFamilyOptions(
}

private static ColumnFamilyHandle createColumnFamily(
ColumnFamilyDescriptor columnDescriptor,
RocksDB db,
List<ExportImportFilesMetaData> importFilesMetaData)
throws RocksDBException {

if (importFilesMetaData.isEmpty()) {
return db.createColumnFamily(columnDescriptor);
} else {
try (ImportColumnFamilyOptions importColumnFamilyOptions =
new ImportColumnFamilyOptions().setMoveFiles(true)) {
return db.createColumnFamilyWithImport(
columnDescriptor, importColumnFamilyOptions, importFilesMetaData);
}
}
ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws RocksDBException {
return db.createColumnFamily(columnDescriptor);
}

public static void addColumnFamilyOptionsToCloseLater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -193,6 +191,7 @@ RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
* @param stateMetaInfo info about the state to create.
* @param cfMetaDataList the data to import.
*/
/*
void registerStateColumnFamilyHandleWithImport(
RegisteredStateMetaInfoBase stateMetaInfo,
List<ExportImportFilesMetaData> cfMetaDataList) {
Expand All @@ -213,6 +212,7 @@ void registerStateColumnFamilyHandleWithImport(
columnFamilyHandles.add(stateInfo.columnFamilyHandle);
}
*/

/**
* This recreates the new working directory of the recovered RocksDB instance and links/copies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
Expand All @@ -71,11 +70,9 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -86,7 +83,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;

Expand Down Expand Up @@ -167,8 +163,10 @@ public RocksDBIncrementalRestoreOperation(
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.keySerializerProvider = keySerializerProvider;
this.userCodeClassLoader = userCodeClassLoader;
this.useIngestDbRestoreMode = useIngestDbRestoreMode;
this.asyncCompactAfterRescale = asyncCompactAfterRescale;
// this.useIngestDbRestoreMode = useIngestDbRestoreMode;
// this.asyncCompactAfterRescale = asyncCompactAfterRescale;
this.useIngestDbRestoreMode = false;
this.asyncCompactAfterRescale = false;
}

/** Root method that branches for different implementations of {@link KeyedStateHandle}. */
Expand Down Expand Up @@ -408,6 +406,7 @@ private void mergeStateHandlesWithClipAndIngest(
byte[] stopKeyGroupPrefixBytes)
throws Exception {

/*
final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath();
final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs");
Files.createDirectories(exportCfBasePath);
Expand Down Expand Up @@ -447,6 +446,7 @@ private void mergeStateHandlesWithClipAndIngest(
// Cleanup export base directory
cleanUpPathQuietly(exportCfBasePath);
}
*/
}

/**
Expand All @@ -462,6 +462,7 @@ private void mergeStateHandlesWithClipAndIngest(
* @return the total key-groups range of the exported data.
* @throws Exception on any export error.
*/
/*
private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
Path exportCfBasePath,
List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
Expand Down Expand Up @@ -542,6 +543,7 @@ private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup)
: KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
}
*/

/**
* Helper method that merges the data from multiple state handles into the restoring base DB by
Expand Down Expand Up @@ -594,6 +596,7 @@ private void mergeStateHandlesWithCopyFromTemporaryInstance(
* @param exportKeyGroupRange the total key-groups range of the exported data.
* @throws Exception on import error.
*/
/*
private void initBaseDBFromColumnFamilyImports(
Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
exportedColumnFamilyMetaData,
Expand Down Expand Up @@ -622,6 +625,7 @@ private void initBaseDBFromColumnFamilyImports(
keyGroupRange,
operatorIdentifier);
}
*/

/**
* Restores the checkpointing status and state for this backend. This can only be done if the
Expand Down

0 comments on commit 268a308

Please sign in to comment.