diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index fa8407729d90c..fb942f7efb967 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -25,8 +25,8 @@
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -340,9 +340,9 @@ public static void serializeKeyedStateHandle(
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
}
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
- } else if (stateHandle instanceof IncrementalKeyedStateHandle) {
- IncrementalKeyedStateHandle incrementalKeyedStateHandle =
- (IncrementalKeyedStateHandle) stateHandle;
+ } else if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle) stateHandle;
dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
@@ -427,7 +427,7 @@ public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis)
uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8));
}
- return new IncrementalKeyedStateHandle(
+ return new IncrementalRemoteKeyedStateHandle(
uuid,
keyGroupRange,
checkpointId,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 01d4ac0132ed7..d3ab3b27bfa90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,305 +18,24 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
+import java.util.Set;
import java.util.UUID;
/**
- * The handle to states of an incremental snapshot.
- *
- * The states contained in an incremental snapshot include:
- *
- * - Created shared state which includes shared files produced since the last
- * completed checkpoint. These files can be referenced by succeeding checkpoints if the
- * checkpoint succeeds to complete.
- * - Referenced shared state which includes the shared files materialized in previous
- * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced
- * shared state handles are only placeholders, so that we do not send state handles twice
- * from which we know that they already exist on the checkpoint coordinator.
- * - Private state which includes all other files, typically mutable, that cannot be shared by
- * other checkpoints.
- * - Backend meta state which includes the information of existing states.
- *
- *
- * When this should become a completed checkpoint on the checkpoint coordinator, it must first be
- * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to
- * previously existing state are replaced with the originals.
- *
- * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
- * should not be called from production code. This means this class is also not suited to serve as
- * a key, e.g. in hash maps.
+ * Common interface to all incremental {@link KeyedStateHandle}.
*/
-public class IncrementalKeyedStateHandle implements KeyedStateHandle {
-
- private static final Logger LOG = LoggerFactory.getLogger(IncrementalKeyedStateHandle.class);
-
- private static final long serialVersionUID = -8328808513197388231L;
-
- /**
- * UUID to identify the backend which created this state handle. This is in creating the key for the
- * {@link SharedStateRegistry}.
- */
- private final UUID backendIdentifier;
-
- /**
- * The key-group range covered by this state handle.
- */
- private final KeyGroupRange keyGroupRange;
-
- /**
- * The checkpoint Id.
- */
- private final long checkpointId;
-
- /**
- * Shared state in the incremental checkpoint.
- */
- private final Map sharedState;
-
- /**
- * Private state in the incremental checkpoint.
- */
- private final Map privateState;
-
- /**
- * Primary meta data state of the incremental checkpoint.
- */
- private final StreamStateHandle metaStateHandle;
-
- /**
- * Once the shared states are registered, it is the {@link SharedStateRegistry}'s
- * responsibility to cleanup those shared states.
- * But in the cases where the state handle is discarded before performing the registration,
- * the handle should delete all the shared states created by it.
- *
- * This variable is not null iff the handles was registered.
- */
- private transient SharedStateRegistry sharedStateRegistry;
-
- public IncrementalKeyedStateHandle(
- UUID backendIdentifier,
- KeyGroupRange keyGroupRange,
- long checkpointId,
- Map sharedState,
- Map privateState,
- StreamStateHandle metaStateHandle) {
-
- this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier);
- this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
- this.checkpointId = checkpointId;
- this.sharedState = Preconditions.checkNotNull(sharedState);
- this.privateState = Preconditions.checkNotNull(privateState);
- this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
- this.sharedStateRegistry = null;
- }
-
- @Override
- public KeyGroupRange getKeyGroupRange() {
- return keyGroupRange;
- }
-
- public long getCheckpointId() {
- return checkpointId;
- }
-
- public Map getSharedState() {
- return sharedState;
- }
-
- public Map getPrivateState() {
- return privateState;
- }
-
- public StreamStateHandle getMetaStateHandle() {
- return metaStateHandle;
- }
-
- public UUID getBackendIdentifier() {
- return backendIdentifier;
- }
-
- public SharedStateRegistry getSharedStateRegistry() {
- return sharedStateRegistry;
- }
-
- @Override
- public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
- return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
- null : this;
- }
+public interface IncrementalKeyedStateHandle extends KeyedStateHandle {
- @Override
- public void discardState() throws Exception {
+ /** Returns the ID of the checkpoint for which the handle was created. */
+ long getCheckpointId();
- SharedStateRegistry registry = this.sharedStateRegistry;
- final boolean isRegistered = (registry != null);
+ /** Returns the identifier of the state backend from which this handle was created.*/
+ @Nonnull
+ UUID getBackendIdentifier();
- LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
- isRegistered,
- checkpointId,
- backendIdentifier);
-
- try {
- metaStateHandle.discardState();
- } catch (Exception e) {
- LOG.warn("Could not properly discard meta data.", e);
- }
-
- try {
- StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard misc file states.", e);
- }
-
- // If this was not registered, we can delete the shared state. We can simply apply this
- // to all handles, because all handles that have not been created for the first time for this
- // are only placeholders at this point (disposing them is a NOP).
- if (isRegistered) {
- // If this was registered, we only unregister all our referenced shared states
- // from the registry.
- for (StateHandleID stateHandleID : sharedState.keySet()) {
- registry.unregisterReference(
- createSharedStateRegistryKeyFromFileName(stateHandleID));
- }
- } else {
- // Otherwise, we assume to own those handles and dispose them directly.
- try {
- StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
- }
- }
- }
-
- @Override
- public long getStateSize() {
- long size = StateUtil.getStateSize(metaStateHandle);
-
- for (StreamStateHandle sharedStateHandle : sharedState.values()) {
- size += sharedStateHandle.getStateSize();
- }
-
- for (StreamStateHandle privateStateHandle : privateState.values()) {
- size += privateStateHandle.getStateSize();
- }
-
- return size;
- }
-
- @Override
- public void registerSharedStates(SharedStateRegistry stateRegistry) {
-
- // This is a quick check to avoid that we register twice with the same registry. However, the code allows to
- // register again with a different registry. The implication is that ownership is transferred to this new
- // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
- // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
- // an old registry object from a previous run is due to be GCed and will never be used for registration again.
- Preconditions.checkState(
- sharedStateRegistry != stateRegistry,
- "The state handle has already registered its shared states to the given registry.");
-
- sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
-
- LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
- checkpointId,
- backendIdentifier);
-
- for (Map.Entry sharedStateHandle : sharedState.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
-
- SharedStateRegistry.Result result =
- stateRegistry.registerReference(registryKey, sharedStateHandle.getValue());
-
- // This step consolidates our shared handles with the registry, which does two things:
- //
- // 1) Replace placeholder state handle with already registered, actual state handles.
- //
- // 2) Deduplicate re-uploads of incremental state due to missing confirmations about
- // completed checkpoints.
- //
- // This prevents the following problem:
- // A previous checkpoint n has already registered the state. This can happen if a
- // following checkpoint (n + x) wants to reference the same state before the backend got
- // notified that checkpoint n completed. In this case, the shared registry did
- // deduplication and returns the previous reference.
- sharedStateHandle.setValue(result.getReference());
- }
- }
-
- /**
- * Create a unique key to register one of our shared state handles.
- */
- @VisibleForTesting
- public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
- return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
- }
-
- /**
- * This method is should only be called in tests! This should never serve as key in a hash map.
- */
- @VisibleForTesting
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- IncrementalKeyedStateHandle that = (IncrementalKeyedStateHandle) o;
-
- if (getCheckpointId() != that.getCheckpointId()) {
- return false;
- }
- if (!getBackendIdentifier().equals(that.getBackendIdentifier())) {
- return false;
- }
- if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
- return false;
- }
- if (!getSharedState().equals(that.getSharedState())) {
- return false;
- }
- if (!getPrivateState().equals(that.getPrivateState())) {
- return false;
- }
- return getMetaStateHandle().equals(that.getMetaStateHandle());
- }
-
- /**
- * This method should only be called in tests! This should never serve as key in a hash map.
- */
- @VisibleForTesting
- @Override
- public int hashCode() {
- int result = getBackendIdentifier().hashCode();
- result = 31 * result + getKeyGroupRange().hashCode();
- result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
- result = 31 * result + getSharedState().hashCode();
- result = 31 * result + getPrivateState().hashCode();
- result = 31 * result + getMetaStateHandle().hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "IncrementalKeyedStateHandle{" +
- "backendIdentifier=" + backendIdentifier +
- ", keyGroupRange=" + keyGroupRange +
- ", checkpointId=" + checkpointId +
- ", sharedState=" + sharedState +
- ", privateState=" + privateState +
- ", metaStateHandle=" + metaStateHandle +
- ", registered=" + (sharedStateRegistry != null) +
- '}';
- }
+ /** Returns a set of ids of all registered shared states in the backend at the time this was created. */
+ @Nonnull
+ Set getSharedStateHandleIDs();
}
-
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index f80a8ce914d79..16ee2bbb9ec86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -27,11 +27,11 @@
import java.util.UUID;
/**
- * State handle for local copies of {@link IncrementalKeyedStateHandle}. Consists of a {@link DirectoryStateHandle} that
+ * State handle for local copies of {@link IncrementalRemoteKeyedStateHandle}. Consists of a {@link DirectoryStateHandle} that
* represents the directory of the native RocksDB snapshot, the key groups, and a stream state handle for Flink's state
* meta data file.
*/
-public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle {
+public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle implements IncrementalKeyedStateHandle {
private static final long serialVersionUID = 1L;
@@ -71,15 +71,18 @@ public StreamStateHandle getMetaDataState() {
return metaDataState;
}
+ @Override
public long getCheckpointId() {
return checkpointId;
}
+ @Override
@Nonnull
public UUID getBackendIdentifier() {
return backendIdentifier;
}
+ @Override
@Nonnull
public Set getSharedStateHandleIDs() {
return sharedStateHandleIDs;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
new file mode 100644
index 0000000000000..77fd49161d20b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * The handle to states of an incremental snapshot.
+ *
+ * The states contained in an incremental snapshot include:
+ *
+ * - Created shared state which includes shared files produced since the last
+ * completed checkpoint. These files can be referenced by succeeding checkpoints if the
+ * checkpoint succeeds to complete.
+ * - Referenced shared state which includes the shared files materialized in previous
+ * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced
+ * shared state handles are only placeholders, so that we do not send state handles twice
+ * from which we know that they already exist on the checkpoint coordinator.
+ * - Private state which includes all other files, typically mutable, that cannot be shared by
+ * other checkpoints.
+ * - Backend meta state which includes the information of existing states.
+ *
+ *
+ * When this should become a completed checkpoint on the checkpoint coordinator, it must first be
+ * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to
+ * previously existing state are replaced with the originals.
+ *
+ * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
+ * should not be called from production code. This means this class is also not suited to serve as
+ * a key, e.g. in hash maps.
+ */
+public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateHandle {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalRemoteKeyedStateHandle.class);
+
+ private static final long serialVersionUID = -8328808513197388231L;
+
+ /**
+ * UUID to identify the backend which created this state handle. This is in creating the key for the
+ * {@link SharedStateRegistry}.
+ */
+ private final UUID backendIdentifier;
+
+ /**
+ * The key-group range covered by this state handle.
+ */
+ private final KeyGroupRange keyGroupRange;
+
+ /**
+ * The checkpoint Id.
+ */
+ private final long checkpointId;
+
+ /**
+ * Shared state in the incremental checkpoint.
+ */
+ private final Map sharedState;
+
+ /**
+ * Private state in the incremental checkpoint.
+ */
+ private final Map privateState;
+
+ /**
+ * Primary meta data state of the incremental checkpoint.
+ */
+ private final StreamStateHandle metaStateHandle;
+
+ /**
+ * Once the shared states are registered, it is the {@link SharedStateRegistry}'s
+ * responsibility to cleanup those shared states.
+ * But in the cases where the state handle is discarded before performing the registration,
+ * the handle should delete all the shared states created by it.
+ *
+ * This variable is not null iff the handles was registered.
+ */
+ private transient SharedStateRegistry sharedStateRegistry;
+
+ public IncrementalRemoteKeyedStateHandle(
+ UUID backendIdentifier,
+ KeyGroupRange keyGroupRange,
+ long checkpointId,
+ Map sharedState,
+ Map privateState,
+ StreamStateHandle metaStateHandle) {
+
+ this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier);
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+ this.checkpointId = checkpointId;
+ this.sharedState = Preconditions.checkNotNull(sharedState);
+ this.privateState = Preconditions.checkNotNull(privateState);
+ this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
+ this.sharedStateRegistry = null;
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public Map getSharedState() {
+ return sharedState;
+ }
+
+ public Map getPrivateState() {
+ return privateState;
+ }
+
+ public StreamStateHandle getMetaStateHandle() {
+ return metaStateHandle;
+ }
+
+ @Nonnull
+ public UUID getBackendIdentifier() {
+ return backendIdentifier;
+ }
+
+ @Nonnull
+ @Override
+ public Set getSharedStateHandleIDs() {
+ return getSharedState().keySet();
+ }
+
+ public SharedStateRegistry getSharedStateRegistry() {
+ return sharedStateRegistry;
+ }
+
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+ null : this;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+
+ SharedStateRegistry registry = this.sharedStateRegistry;
+ final boolean isRegistered = (registry != null);
+
+ LOG.trace("Discarding IncrementalRemoteKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+ isRegistered,
+ checkpointId,
+ backendIdentifier);
+
+ try {
+ metaStateHandle.discardState();
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard meta data.", e);
+ }
+
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard misc file states.", e);
+ }
+
+ // If this was not registered, we can delete the shared state. We can simply apply this
+ // to all handles, because all handles that have not been created for the first time for this
+ // are only placeholders at this point (disposing them is a NOP).
+ if (isRegistered) {
+ // If this was registered, we only unregister all our referenced shared states
+ // from the registry.
+ for (StateHandleID stateHandleID : sharedState.keySet()) {
+ registry.unregisterReference(
+ createSharedStateRegistryKeyFromFileName(stateHandleID));
+ }
+ } else {
+ // Otherwise, we assume to own those handles and dispose them directly.
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
+ }
+ }
+ }
+
+ @Override
+ public long getStateSize() {
+ long size = StateUtil.getStateSize(metaStateHandle);
+
+ for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+ size += sharedStateHandle.getStateSize();
+ }
+
+ for (StreamStateHandle privateStateHandle : privateState.values()) {
+ size += privateStateHandle.getStateSize();
+ }
+
+ return size;
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
+ // This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+ // register again with a different registry. The implication is that ownership is transferred to this new
+ // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+ // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+ // an old registry object from a previous run is due to be GCed and will never be used for registration again.
+ Preconditions.checkState(
+ sharedStateRegistry != stateRegistry,
+ "The state handle has already registered its shared states to the given registry.");
+
+ sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
+
+ LOG.trace("Registering IncrementalRemoteKeyedStateHandle for checkpoint {} from backend with id {}.",
+ checkpointId,
+ backendIdentifier);
+
+ for (Map.Entry sharedStateHandle : sharedState.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+
+ SharedStateRegistry.Result result =
+ stateRegistry.registerReference(registryKey, sharedStateHandle.getValue());
+
+ // This step consolidates our shared handles with the registry, which does two things:
+ //
+ // 1) Replace placeholder state handle with already registered, actual state handles.
+ //
+ // 2) Deduplicate re-uploads of incremental state due to missing confirmations about
+ // completed checkpoints.
+ //
+ // This prevents the following problem:
+ // A previous checkpoint n has already registered the state. This can happen if a
+ // following checkpoint (n + x) wants to reference the same state before the backend got
+ // notified that checkpoint n completed. In this case, the shared registry did
+ // deduplication and returns the previous reference.
+ sharedStateHandle.setValue(result.getReference());
+ }
+ }
+
+ /**
+ * Create a unique key to register one of our shared state handles.
+ */
+ @VisibleForTesting
+ public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+ return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+ }
+
+ /**
+ * This method is should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncrementalRemoteKeyedStateHandle that = (IncrementalRemoteKeyedStateHandle) o;
+
+ if (getCheckpointId() != that.getCheckpointId()) {
+ return false;
+ }
+ if (!getBackendIdentifier().equals(that.getBackendIdentifier())) {
+ return false;
+ }
+ if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
+ return false;
+ }
+ if (!getSharedState().equals(that.getSharedState())) {
+ return false;
+ }
+ if (!getPrivateState().equals(that.getPrivateState())) {
+ return false;
+ }
+ return getMetaStateHandle().equals(that.getMetaStateHandle());
+ }
+
+ /**
+ * This method should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public int hashCode() {
+ int result = getBackendIdentifier().hashCode();
+ result = 31 * result + getKeyGroupRange().hashCode();
+ result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
+ result = 31 * result + getSharedState().hashCode();
+ result = 31 * result + getPrivateState().hashCode();
+ result = 31 * result + getMetaStateHandle().hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "IncrementalRemoteKeyedStateHandle{" +
+ "backendIdentifier=" + backendIdentifier +
+ ", keyGroupRange=" + keyGroupRange +
+ ", checkpointId=" + checkpointId +
+ ", sharedState=" + sharedState +
+ ", privateState=" + privateState +
+ ", metaStateHandle=" + metaStateHandle +
+ ", registered=" + (sharedStateRegistry != null) +
+ '}';
+ }
+}
+
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 7c948a169cb3b..17f1d00418644 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -25,7 +25,7 @@
* A placeholder state handle for shared state that will replaced by an original that was
* created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in
* case of {@link ByteStreamStateHandle}. This class is used in the referenced states of
- * {@link IncrementalKeyedStateHandle}.
+ * {@link IncrementalRemoteKeyedStateHandle}.
*/
public class PlaceholderStreamStateHandle implements StreamStateHandle {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 02c34d147b9d3..1b522182a389a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -35,7 +35,7 @@
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -67,8 +67,6 @@
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -3488,7 +3486,7 @@ public void testSharedStateRegistrationOnRestore() throws Exception {
for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
// test we are once registered with the current registry
verify(keyedStateHandle, times(1)).registerSharedStates(createdSharedStateRegistries.get(0));
- IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle;
+ IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState());
@@ -3618,8 +3616,8 @@ private void performIncrementalCheckpoint(
new StateHandleID("shared-" + cpSequenceNumber),
spy(new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'})));
- IncrementalKeyedStateHandle managedState =
- spy(new IncrementalKeyedStateHandle(
+ IncrementalRemoteKeyedStateHandle managedState =
+ spy(new IncrementalRemoteKeyedStateHandle(
new UUID(42L, 42L),
keyGroupRange,
checkpointId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index 1963766fb1d94..55a9772bd6d5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -27,7 +27,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -255,8 +255,8 @@ public static void assertMasterStateEquality(MasterState a, MasterState b) {
private CheckpointTestUtils() {}
- public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
- return new IncrementalKeyedStateHandle(
+ public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
+ return new IncrementalRemoteKeyedStateHandle(
createRandomUUID(rnd),
new KeyGroupRange(1, 1),
42L,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
similarity index 92%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index 9f6f88ec42eb8..dd1039a71cd08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -32,15 +32,15 @@
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
-public class IncrementalKeyedStateHandleTest {
+public class IncrementalRemoteKeyedStateHandleTest {
/**
- * This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state
+ * This test checks, that for an unregistered {@link IncrementalRemoteKeyedStateHandle} all state
* (including shared) is discarded.
*/
@Test
public void testUnregisteredDiscarding() throws Exception {
- IncrementalKeyedStateHandle stateHandle = create(new Random(42));
+ IncrementalRemoteKeyedStateHandle stateHandle = create(new Random(42));
stateHandle.discardState();
@@ -56,7 +56,7 @@ public void testUnregisteredDiscarding() throws Exception {
}
/**
- * This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect
+ * This test checks, that for a registered {@link IncrementalRemoteKeyedStateHandle} discards respect
* all shared state and only discard it one all references are released.
*/
@Test
@@ -65,8 +65,8 @@ public void testSharedStateDeRegistration() throws Exception {
SharedStateRegistry registry = spy(new SharedStateRegistry());
// Create two state handles with overlapping shared state
- IncrementalKeyedStateHandle stateHandle1 = create(new Random(42));
- IncrementalKeyedStateHandle stateHandle2 = create(new Random(42));
+ IncrementalRemoteKeyedStateHandle stateHandle1 = create(new Random(42));
+ IncrementalRemoteKeyedStateHandle stateHandle2 = create(new Random(42));
// Both handles should not be registered and not discarded by now.
for (Map.Entry entry :
@@ -197,9 +197,9 @@ public void testSharedStateReRegistration() throws Exception {
SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
- IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
- IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
- IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
+ IncrementalRemoteKeyedStateHandle stateHandleX = create(new Random(1));
+ IncrementalRemoteKeyedStateHandle stateHandleY = create(new Random(2));
+ IncrementalRemoteKeyedStateHandle stateHandleZ = create(new Random(3));
// Now we register first time ...
stateHandleX.registerSharedStates(stateRegistryA);
@@ -257,8 +257,8 @@ public void testSharedStateReRegistration() throws Exception {
sharedStateRegistryB.close();
}
- private static IncrementalKeyedStateHandle create(Random rnd) {
- return new IncrementalKeyedStateHandle(
+ private static IncrementalRemoteKeyedStateHandle create(Random rnd) {
+ return new IncrementalRemoteKeyedStateHandle(
UUID.nameUUIDFromBytes("test".getBytes()),
KeyGroupRange.of(0, 0),
1L,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index c5cf5012fae5e..eb5bccc0c5e80 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -36,7 +36,6 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -376,9 +375,7 @@ private AbstractRocksDBRestoreOperation getRocksDBRestoreOperation(
ttlTimeProvider);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
- boolean isIncrementalStateHandle = (firstStateHandle instanceof IncrementalKeyedStateHandle)
- || (firstStateHandle instanceof IncrementalLocalKeyedStateHandle);
- if (isIncrementalStateHandle) {
+ if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(
operatorIdentifier,
keyGroupRange,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index 501f333c415f4..aa3201c308e86 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -23,7 +23,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
@@ -54,7 +54,7 @@ public RocksDBStateDownloader(int restoringThreadNum) {
* @throws Exception Thrown if can not transfer all the state data.
*/
public void transferAllStateDataToDirectory(
- IncrementalKeyedStateHandle restoreStateHandle,
+ IncrementalRemoteKeyedStateHandle restoreStateHandle,
Path dest,
CloseableRegistry closeableRegistry) throws Exception {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index b085d512773cb..3371f4f5435ce 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -40,6 +40,7 @@
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -156,24 +157,25 @@ public RocksDBRestoreResult restore() throws Exception {
/**
* Recovery from a single remote incremental state without rescaling.
*/
- private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
- if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
- IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;
- restorePreviousIncrementalFilesStatus(incrementalKeyedStateHandle);
- restoreFromRemoteState(incrementalKeyedStateHandle);
- } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) {
+ private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
+ if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
+ restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
+ restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
+ } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
- (IncrementalLocalKeyedStateHandle) rawStateHandle;
+ (IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
- "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
- ", but found " + rawStateHandle.getClass());
+ "expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
+ ", but found " + keyedStateHandle.getClass());
}
}
- private void restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHandle localKeyedStateHandle) {
+ private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle localKeyedStateHandle) {
backendUID = localKeyedStateHandle.getBackendIdentifier();
restoredSstFiles.put(
localKeyedStateHandle.getCheckpointId(),
@@ -181,18 +183,10 @@ private void restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHan
lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId();
}
- private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle remoteKeyedStateHandle) {
- backendUID = remoteKeyedStateHandle.getBackendIdentifier();
- restoredSstFiles.put(
- remoteKeyedStateHandle.getCheckpointId(),
- remoteKeyedStateHandle.getSharedState().keySet());
- lastCompletedCheckpointId = remoteKeyedStateHandle.getCheckpointId();
- }
-
- private void restoreFromRemoteState(IncrementalKeyedStateHandle stateHandle) throws Exception {
+ private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception {
final Path tmpRestoreInstancePath = new Path(
instanceBasePath.getAbsolutePath(),
- UUID.randomUUID().toString()); // used as restore source for IncrementalKeyedStateHandle
+ UUID.randomUUID().toString()); // used as restore source for IncrementalRemoteKeyedStateHandle
try {
restoreFromLocalState(
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
@@ -227,7 +221,7 @@ private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedSt
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
- IncrementalKeyedStateHandle restoreStateHandle) throws Exception {
+ IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(
@@ -302,15 +296,15 @@ private void restoreWithRescaling(Collection restoreStateHandl
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
- if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
+ if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
- "expected " + IncrementalKeyedStateHandle.class +
+ "expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
- (IncrementalKeyedStateHandle) rawStateHandle,
+ (IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {
@@ -351,10 +345,10 @@ private void restoreWithRescaling(Collection restoreStateHandl
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {
- assert (initialHandle instanceof IncrementalKeyedStateHandle);
+ assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);
// 1. Restore base DB from selected initial handle
- restoreFromRemoteState((IncrementalKeyedStateHandle) initialHandle);
+ restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);
// 2. Clip the base DB instance
try {
@@ -416,7 +410,7 @@ public void close() {
}
private RestoredDBInstance restoreDBInstanceFromStateHandle(
- IncrementalKeyedStateHandle restoreStateHandle,
+ IncrementalRemoteKeyedStateHandle restoreStateHandle,
Path temporaryRestoreInstancePath) throws Exception {
try (RocksDBStateDownloader rocksDBStateDownloader =
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 3ede377661e9a..ed87024c6e1a3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -32,8 +32,8 @@
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -318,8 +318,8 @@ protected SnapshotResult callInternal() throws Exception {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
- final IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle =
- new IncrementalKeyedStateHandle(
+ final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
+ new IncrementalRemoteKeyedStateHandle(
backendUID,
keyGroupRange,
checkpointId,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index b681a877a5a27..9bb3dc03759fd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,7 +31,7 @@
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -495,7 +495,7 @@ public void testSharedIncrementalStateDeRegistration() throws Exception {
ValueState state =
backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- Queue previousStateHandles = new LinkedList<>();
+ Queue previousStateHandles = new LinkedList<>();
SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
@@ -514,8 +514,8 @@ public void testSharedIncrementalStateDeRegistration() throws Exception {
SnapshotResult snapshotResult = snapshot.get();
- IncrementalKeyedStateHandle stateHandle =
- (IncrementalKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot();
+ IncrementalRemoteKeyedStateHandle stateHandle =
+ (IncrementalRemoteKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot();
Map sharedState =
new HashMap<>(stateHandle.getSharedState());
@@ -551,7 +551,7 @@ public void testSharedIncrementalStateDeRegistration() throws Exception {
}
}
- private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
+ private void checkRemove(IncrementalRemoteKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
for (StateHandleID id : remove.getSharedState().keySet()) {
verify(registry, times(0)).unregisterReference(
remove.createSharedStateRegistryKeyFromFileName(id));
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index f99beca3b408e..76372b7fc17bb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -21,7 +21,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -79,8 +79,8 @@ public long getStateSize() {
Map stateHandles = new HashMap<>(1);
stateHandles.put(new StateHandleID("state1"), stateHandle);
- IncrementalKeyedStateHandle incrementalKeyedStateHandle =
- new IncrementalKeyedStateHandle(
+ IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
+ new IncrementalRemoteKeyedStateHandle(
UUID.randomUUID(),
KeyGroupRange.EMPTY_KEY_GROUP_RANGE,
1,
@@ -124,8 +124,8 @@ public void testMultiThreadRestoreCorrectly() throws Exception {
privateStates.put(new StateHandleID(String.format("privateState%d", i)), handles.get(i));
}
- IncrementalKeyedStateHandle incrementalKeyedStateHandle =
- new IncrementalKeyedStateHandle(
+ IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
+ new IncrementalRemoteKeyedStateHandle(
UUID.randomUUID(),
KeyGroupRange.of(0, 1),
1,