From 5bc04d255bdf850de73c0ff67e79fbb73fb299ce Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Tue, 26 Feb 2019 16:08:48 +0100 Subject: [PATCH] [hotfix] Introduce common interface to all IncrementalKeyedStateHandles --- .../savepoint/SavepointV2Serializer.java | 10 +- .../state/IncrementalKeyedStateHandle.java | 307 +--------------- .../IncrementalLocalKeyedStateHandle.java | 7 +- .../IncrementalRemoteKeyedStateHandle.java | 333 ++++++++++++++++++ .../state/PlaceholderStreamStateHandle.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 10 +- .../savepoint/CheckpointTestUtils.java | 6 +- ...ncrementalRemoteKeyedStateHandleTest.java} | 22 +- .../RocksDBKeyedStateBackendBuilder.java | 5 +- .../state/RocksDBStateDownloader.java | 4 +- .../RocksDBIncrementalRestoreOperation.java | 48 ++- .../RocksIncrementalSnapshotStrategy.java | 6 +- .../state/RocksDBStateBackendTest.java | 10 +- .../state/RocksDBStateDownloaderTest.java | 10 +- 14 files changed, 412 insertions(+), 368 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java rename flink-runtime/src/test/java/org/apache/flink/runtime/state/{IncrementalKeyedStateHandleTest.java => IncrementalRemoteKeyedStateHandleTest.java} (92%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index fa8407729d90c..fb942f7efb967 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; @@ -340,9 +340,9 @@ public static void serializeKeyedStateHandle( dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup)); } serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos); - } else if (stateHandle instanceof IncrementalKeyedStateHandle) { - IncrementalKeyedStateHandle incrementalKeyedStateHandle = - (IncrementalKeyedStateHandle) stateHandle; + } else if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { + IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = + (IncrementalRemoteKeyedStateHandle) stateHandle; dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE); @@ -427,7 +427,7 @@ public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8)); } - return new IncrementalKeyedStateHandle( + return new IncrementalRemoteKeyedStateHandle( uuid, keyGroupRange, checkpointId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 01d4ac0132ed7..d3ab3b27bfa90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,305 +18,24 @@ package org.apache.flink.runtime.state; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; +import java.util.Set; import java.util.UUID; /** - * The handle to states of an incremental snapshot. - *

- * The states contained in an incremental snapshot include: - *

- * - * When this should become a completed checkpoint on the checkpoint coordinator, it must first be - * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to - * previously existing state are replaced with the originals. - * - * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They - * should not be called from production code. This means this class is also not suited to serve as - * a key, e.g. in hash maps. + * Common interface to all incremental {@link KeyedStateHandle}. */ -public class IncrementalKeyedStateHandle implements KeyedStateHandle { - - private static final Logger LOG = LoggerFactory.getLogger(IncrementalKeyedStateHandle.class); - - private static final long serialVersionUID = -8328808513197388231L; - - /** - * UUID to identify the backend which created this state handle. This is in creating the key for the - * {@link SharedStateRegistry}. - */ - private final UUID backendIdentifier; - - /** - * The key-group range covered by this state handle. - */ - private final KeyGroupRange keyGroupRange; - - /** - * The checkpoint Id. - */ - private final long checkpointId; - - /** - * Shared state in the incremental checkpoint. - */ - private final Map sharedState; - - /** - * Private state in the incremental checkpoint. - */ - private final Map privateState; - - /** - * Primary meta data state of the incremental checkpoint. - */ - private final StreamStateHandle metaStateHandle; - - /** - * Once the shared states are registered, it is the {@link SharedStateRegistry}'s - * responsibility to cleanup those shared states. - * But in the cases where the state handle is discarded before performing the registration, - * the handle should delete all the shared states created by it. - * - * This variable is not null iff the handles was registered. - */ - private transient SharedStateRegistry sharedStateRegistry; - - public IncrementalKeyedStateHandle( - UUID backendIdentifier, - KeyGroupRange keyGroupRange, - long checkpointId, - Map sharedState, - Map privateState, - StreamStateHandle metaStateHandle) { - - this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier); - this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); - this.checkpointId = checkpointId; - this.sharedState = Preconditions.checkNotNull(sharedState); - this.privateState = Preconditions.checkNotNull(privateState); - this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); - this.sharedStateRegistry = null; - } - - @Override - public KeyGroupRange getKeyGroupRange() { - return keyGroupRange; - } - - public long getCheckpointId() { - return checkpointId; - } - - public Map getSharedState() { - return sharedState; - } - - public Map getPrivateState() { - return privateState; - } - - public StreamStateHandle getMetaStateHandle() { - return metaStateHandle; - } - - public UUID getBackendIdentifier() { - return backendIdentifier; - } - - public SharedStateRegistry getSharedStateRegistry() { - return sharedStateRegistry; - } - - @Override - public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ? - null : this; - } +public interface IncrementalKeyedStateHandle extends KeyedStateHandle { - @Override - public void discardState() throws Exception { + /** Returns the ID of the checkpoint for which the handle was created. */ + long getCheckpointId(); - SharedStateRegistry registry = this.sharedStateRegistry; - final boolean isRegistered = (registry != null); + /** Returns the identifier of the state backend from which this handle was created.*/ + @Nonnull + UUID getBackendIdentifier(); - LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.", - isRegistered, - checkpointId, - backendIdentifier); - - try { - metaStateHandle.discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard meta data.", e); - } - - try { - StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard misc file states.", e); - } - - // If this was not registered, we can delete the shared state. We can simply apply this - // to all handles, because all handles that have not been created for the first time for this - // are only placeholders at this point (disposing them is a NOP). - if (isRegistered) { - // If this was registered, we only unregister all our referenced shared states - // from the registry. - for (StateHandleID stateHandleID : sharedState.keySet()) { - registry.unregisterReference( - createSharedStateRegistryKeyFromFileName(stateHandleID)); - } - } else { - // Otherwise, we assume to own those handles and dispose them directly. - try { - StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); - } - } - } - - @Override - public long getStateSize() { - long size = StateUtil.getStateSize(metaStateHandle); - - for (StreamStateHandle sharedStateHandle : sharedState.values()) { - size += sharedStateHandle.getStateSize(); - } - - for (StreamStateHandle privateStateHandle : privateState.values()) { - size += privateStateHandle.getStateSize(); - } - - return size; - } - - @Override - public void registerSharedStates(SharedStateRegistry stateRegistry) { - - // This is a quick check to avoid that we register twice with the same registry. However, the code allows to - // register again with a different registry. The implication is that ownership is transferred to this new - // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new - // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that - // an old registry object from a previous run is due to be GCed and will never be used for registration again. - Preconditions.checkState( - sharedStateRegistry != stateRegistry, - "The state handle has already registered its shared states to the given registry."); - - sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); - - LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.", - checkpointId, - backendIdentifier); - - for (Map.Entry sharedStateHandle : sharedState.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); - - SharedStateRegistry.Result result = - stateRegistry.registerReference(registryKey, sharedStateHandle.getValue()); - - // This step consolidates our shared handles with the registry, which does two things: - // - // 1) Replace placeholder state handle with already registered, actual state handles. - // - // 2) Deduplicate re-uploads of incremental state due to missing confirmations about - // completed checkpoints. - // - // This prevents the following problem: - // A previous checkpoint n has already registered the state. This can happen if a - // following checkpoint (n + x) wants to reference the same state before the backend got - // notified that checkpoint n completed. In this case, the shared registry did - // deduplication and returns the previous reference. - sharedStateHandle.setValue(result.getReference()); - } - } - - /** - * Create a unique key to register one of our shared state handles. - */ - @VisibleForTesting - public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { - return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); - } - - /** - * This method is should only be called in tests! This should never serve as key in a hash map. - */ - @VisibleForTesting - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - IncrementalKeyedStateHandle that = (IncrementalKeyedStateHandle) o; - - if (getCheckpointId() != that.getCheckpointId()) { - return false; - } - if (!getBackendIdentifier().equals(that.getBackendIdentifier())) { - return false; - } - if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { - return false; - } - if (!getSharedState().equals(that.getSharedState())) { - return false; - } - if (!getPrivateState().equals(that.getPrivateState())) { - return false; - } - return getMetaStateHandle().equals(that.getMetaStateHandle()); - } - - /** - * This method should only be called in tests! This should never serve as key in a hash map. - */ - @VisibleForTesting - @Override - public int hashCode() { - int result = getBackendIdentifier().hashCode(); - result = 31 * result + getKeyGroupRange().hashCode(); - result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); - result = 31 * result + getSharedState().hashCode(); - result = 31 * result + getPrivateState().hashCode(); - result = 31 * result + getMetaStateHandle().hashCode(); - return result; - } - - @Override - public String toString() { - return "IncrementalKeyedStateHandle{" + - "backendIdentifier=" + backendIdentifier + - ", keyGroupRange=" + keyGroupRange + - ", checkpointId=" + checkpointId + - ", sharedState=" + sharedState + - ", privateState=" + privateState + - ", metaStateHandle=" + metaStateHandle + - ", registered=" + (sharedStateRegistry != null) + - '}'; - } + /** Returns a set of ids of all registered shared states in the backend at the time this was created. */ + @Nonnull + Set getSharedStateHandleIDs(); } - diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java index f80a8ce914d79..16ee2bbb9ec86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -27,11 +27,11 @@ import java.util.UUID; /** - * State handle for local copies of {@link IncrementalKeyedStateHandle}. Consists of a {@link DirectoryStateHandle} that + * State handle for local copies of {@link IncrementalRemoteKeyedStateHandle}. Consists of a {@link DirectoryStateHandle} that * represents the directory of the native RocksDB snapshot, the key groups, and a stream state handle for Flink's state * meta data file. */ -public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle { +public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle implements IncrementalKeyedStateHandle { private static final long serialVersionUID = 1L; @@ -71,15 +71,18 @@ public StreamStateHandle getMetaDataState() { return metaDataState; } + @Override public long getCheckpointId() { return checkpointId; } + @Override @Nonnull public UUID getBackendIdentifier() { return backendIdentifier; } + @Override @Nonnull public Set getSharedStateHandleIDs() { return sharedStateHandleIDs; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java new file mode 100644 index 0000000000000..77fd49161d20b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * The handle to states of an incremental snapshot. + *

+ * The states contained in an incremental snapshot include: + *

    + *
  • Created shared state which includes shared files produced since the last + * completed checkpoint. These files can be referenced by succeeding checkpoints if the + * checkpoint succeeds to complete.
  • + *
  • Referenced shared state which includes the shared files materialized in previous + * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced + * shared state handles are only placeholders, so that we do not send state handles twice + * from which we know that they already exist on the checkpoint coordinator.
  • + *
  • Private state which includes all other files, typically mutable, that cannot be shared by + * other checkpoints.
  • + *
  • Backend meta state which includes the information of existing states.
  • + *
+ * + * When this should become a completed checkpoint on the checkpoint coordinator, it must first be + * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to + * previously existing state are replaced with the originals. + * + * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They + * should not be called from production code. This means this class is also not suited to serve as + * a key, e.g. in hash maps. + */ +public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalRemoteKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + /** + * UUID to identify the backend which created this state handle. This is in creating the key for the + * {@link SharedStateRegistry}. + */ + private final UUID backendIdentifier; + + /** + * The key-group range covered by this state handle. + */ + private final KeyGroupRange keyGroupRange; + + /** + * The checkpoint Id. + */ + private final long checkpointId; + + /** + * Shared state in the incremental checkpoint. + */ + private final Map sharedState; + + /** + * Private state in the incremental checkpoint. + */ + private final Map privateState; + + /** + * Primary meta data state of the incremental checkpoint. + */ + private final StreamStateHandle metaStateHandle; + + /** + * Once the shared states are registered, it is the {@link SharedStateRegistry}'s + * responsibility to cleanup those shared states. + * But in the cases where the state handle is discarded before performing the registration, + * the handle should delete all the shared states created by it. + * + * This variable is not null iff the handles was registered. + */ + private transient SharedStateRegistry sharedStateRegistry; + + public IncrementalRemoteKeyedStateHandle( + UUID backendIdentifier, + KeyGroupRange keyGroupRange, + long checkpointId, + Map sharedState, + Map privateState, + StreamStateHandle metaStateHandle) { + + this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); + this.checkpointId = checkpointId; + this.sharedState = Preconditions.checkNotNull(sharedState); + this.privateState = Preconditions.checkNotNull(privateState); + this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); + this.sharedStateRegistry = null; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public long getCheckpointId() { + return checkpointId; + } + + public Map getSharedState() { + return sharedState; + } + + public Map getPrivateState() { + return privateState; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Nonnull + public UUID getBackendIdentifier() { + return backendIdentifier; + } + + @Nonnull + @Override + public Set getSharedStateHandleIDs() { + return getSharedState().keySet(); + } + + public SharedStateRegistry getSharedStateRegistry() { + return sharedStateRegistry; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ? + null : this; + } + + @Override + public void discardState() throws Exception { + + SharedStateRegistry registry = this.sharedStateRegistry; + final boolean isRegistered = (registry != null); + + LOG.trace("Discarding IncrementalRemoteKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.", + isRegistered, + checkpointId, + backendIdentifier); + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file states.", e); + } + + // If this was not registered, we can delete the shared state. We can simply apply this + // to all handles, because all handles that have not been created for the first time for this + // are only placeholders at this point (disposing them is a NOP). + if (isRegistered) { + // If this was registered, we only unregister all our referenced shared states + // from the registry. + for (StateHandleID stateHandleID : sharedState.keySet()) { + registry.unregisterReference( + createSharedStateRegistryKeyFromFileName(stateHandleID)); + } + } else { + // Otherwise, we assume to own those handles and dispose them directly. + try { + StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } + } + } + + @Override + public long getStateSize() { + long size = StateUtil.getStateSize(metaStateHandle); + + for (StreamStateHandle sharedStateHandle : sharedState.values()) { + size += sharedStateHandle.getStateSize(); + } + + for (StreamStateHandle privateStateHandle : privateState.values()) { + size += privateStateHandle.getStateSize(); + } + + return size; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + + // This is a quick check to avoid that we register twice with the same registry. However, the code allows to + // register again with a different registry. The implication is that ownership is transferred to this new + // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new + // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that + // an old registry object from a previous run is due to be GCed and will never be used for registration again. + Preconditions.checkState( + sharedStateRegistry != stateRegistry, + "The state handle has already registered its shared states to the given registry."); + + sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); + + LOG.trace("Registering IncrementalRemoteKeyedStateHandle for checkpoint {} from backend with id {}.", + checkpointId, + backendIdentifier); + + for (Map.Entry sharedStateHandle : sharedState.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); + + SharedStateRegistry.Result result = + stateRegistry.registerReference(registryKey, sharedStateHandle.getValue()); + + // This step consolidates our shared handles with the registry, which does two things: + // + // 1) Replace placeholder state handle with already registered, actual state handles. + // + // 2) Deduplicate re-uploads of incremental state due to missing confirmations about + // completed checkpoints. + // + // This prevents the following problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + sharedStateHandle.setValue(result.getReference()); + } + } + + /** + * Create a unique key to register one of our shared state handles. + */ + @VisibleForTesting + public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { + return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + } + + /** + * This method is should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncrementalRemoteKeyedStateHandle that = (IncrementalRemoteKeyedStateHandle) o; + + if (getCheckpointId() != that.getCheckpointId()) { + return false; + } + if (!getBackendIdentifier().equals(that.getBackendIdentifier())) { + return false; + } + if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { + return false; + } + if (!getSharedState().equals(that.getSharedState())) { + return false; + } + if (!getPrivateState().equals(that.getPrivateState())) { + return false; + } + return getMetaStateHandle().equals(that.getMetaStateHandle()); + } + + /** + * This method should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public int hashCode() { + int result = getBackendIdentifier().hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); + result = 31 * result + getSharedState().hashCode(); + result = 31 * result + getPrivateState().hashCode(); + result = 31 * result + getMetaStateHandle().hashCode(); + return result; + } + + @Override + public String toString() { + return "IncrementalRemoteKeyedStateHandle{" + + "backendIdentifier=" + backendIdentifier + + ", keyGroupRange=" + keyGroupRange + + ", checkpointId=" + checkpointId + + ", sharedState=" + sharedState + + ", privateState=" + privateState + + ", metaStateHandle=" + metaStateHandle + + ", registered=" + (sharedStateRegistry != null) + + '}'; + } +} + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java index 7c948a169cb3b..17f1d00418644 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -25,7 +25,7 @@ * A placeholder state handle for shared state that will replaced by an original that was * created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in * case of {@link ByteStreamStateHandle}. This class is used in the referenced states of - * {@link IncrementalKeyedStateHandle}. + * {@link IncrementalRemoteKeyedStateHandle}. */ public class PlaceholderStreamStateHandle implements StreamStateHandle { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 02c34d147b9d3..1b522182a389a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; @@ -67,8 +67,6 @@ import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; -import javax.annotation.Nullable; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -3488,7 +3486,7 @@ public void testSharedStateRegistrationOnRestore() throws Exception { for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) { // test we are once registered with the current registry verify(keyedStateHandle, times(1)).registerSharedStates(createdSharedStateRegistries.get(0)); - IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle; + IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle) keyedStateHandle; sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState()); @@ -3618,8 +3616,8 @@ private void performIncrementalCheckpoint( new StateHandleID("shared-" + cpSequenceNumber), spy(new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'}))); - IncrementalKeyedStateHandle managedState = - spy(new IncrementalKeyedStateHandle( + IncrementalRemoteKeyedStateHandle managedState = + spy(new IncrementalRemoteKeyedStateHandle( new UUID(42L, 42L), keyGroupRange, checkpointId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index 1963766fb1d94..55a9772bd6d5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; @@ -255,8 +255,8 @@ public static void assertMasterStateEquality(MasterState a, MasterState b) { private CheckpointTestUtils() {} - public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { - return new IncrementalKeyedStateHandle( + public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { + return new IncrementalRemoteKeyedStateHandle( createRandomUUID(rnd), new KeyGroupRange(1, 1), 42L, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java similarity index 92% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java index 9f6f88ec42eb8..dd1039a71cd08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java @@ -32,15 +32,15 @@ import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; -public class IncrementalKeyedStateHandleTest { +public class IncrementalRemoteKeyedStateHandleTest { /** - * This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state + * This test checks, that for an unregistered {@link IncrementalRemoteKeyedStateHandle} all state * (including shared) is discarded. */ @Test public void testUnregisteredDiscarding() throws Exception { - IncrementalKeyedStateHandle stateHandle = create(new Random(42)); + IncrementalRemoteKeyedStateHandle stateHandle = create(new Random(42)); stateHandle.discardState(); @@ -56,7 +56,7 @@ public void testUnregisteredDiscarding() throws Exception { } /** - * This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect + * This test checks, that for a registered {@link IncrementalRemoteKeyedStateHandle} discards respect * all shared state and only discard it one all references are released. */ @Test @@ -65,8 +65,8 @@ public void testSharedStateDeRegistration() throws Exception { SharedStateRegistry registry = spy(new SharedStateRegistry()); // Create two state handles with overlapping shared state - IncrementalKeyedStateHandle stateHandle1 = create(new Random(42)); - IncrementalKeyedStateHandle stateHandle2 = create(new Random(42)); + IncrementalRemoteKeyedStateHandle stateHandle1 = create(new Random(42)); + IncrementalRemoteKeyedStateHandle stateHandle2 = create(new Random(42)); // Both handles should not be registered and not discarded by now. for (Map.Entry entry : @@ -197,9 +197,9 @@ public void testSharedStateReRegistration() throws Exception { SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry()); - IncrementalKeyedStateHandle stateHandleX = create(new Random(1)); - IncrementalKeyedStateHandle stateHandleY = create(new Random(2)); - IncrementalKeyedStateHandle stateHandleZ = create(new Random(3)); + IncrementalRemoteKeyedStateHandle stateHandleX = create(new Random(1)); + IncrementalRemoteKeyedStateHandle stateHandleY = create(new Random(2)); + IncrementalRemoteKeyedStateHandle stateHandleZ = create(new Random(3)); // Now we register first time ... stateHandleX.registerSharedStates(stateRegistryA); @@ -257,8 +257,8 @@ public void testSharedStateReRegistration() throws Exception { sharedStateRegistryB.close(); } - private static IncrementalKeyedStateHandle create(Random rnd) { - return new IncrementalKeyedStateHandle( + private static IncrementalRemoteKeyedStateHandle create(Random rnd) { + return new IncrementalRemoteKeyedStateHandle( UUID.nameUUIDFromBytes("test".getBytes()), KeyGroupRange.of(0, 0), 1L, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index c5cf5012fae5e..eb5bccc0c5e80 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; import org.apache.flink.runtime.state.BackendBuildingException; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; -import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; @@ -376,9 +375,7 @@ private AbstractRocksDBRestoreOperation getRocksDBRestoreOperation( ttlTimeProvider); } KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next(); - boolean isIncrementalStateHandle = (firstStateHandle instanceof IncrementalKeyedStateHandle) - || (firstStateHandle instanceof IncrementalLocalKeyedStateHandle); - if (isIncrementalStateHandle) { + if (firstStateHandle instanceof IncrementalKeyedStateHandle) { return new RocksDBIncrementalRestoreOperation<>( operatorIdentifier, keyGroupRange, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index 501f333c415f4..aa3201c308e86 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; @@ -54,7 +54,7 @@ public RocksDBStateDownloader(int restoringThreadNum) { * @throws Exception Thrown if can not transfer all the state data. */ public void transferAllStateDataToDirectory( - IncrementalKeyedStateHandle restoreStateHandle, + IncrementalRemoteKeyedStateHandle restoreStateHandle, Path dest, CloseableRegistry closeableRegistry) throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index b085d512773cb..3371f4f5435ce 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -156,24 +157,25 @@ public RocksDBRestoreResult restore() throws Exception { /** * Recovery from a single remote incremental state without rescaling. */ - private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { - if (rawStateHandle instanceof IncrementalKeyedStateHandle) { - IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; - restorePreviousIncrementalFilesStatus(incrementalKeyedStateHandle); - restoreFromRemoteState(incrementalKeyedStateHandle); - } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { + private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception { + if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) { + IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = + (IncrementalRemoteKeyedStateHandle) keyedStateHandle; + restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle); + restoreFromRemoteState(incrementalRemoteKeyedStateHandle); + } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) { IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = - (IncrementalLocalKeyedStateHandle) rawStateHandle; + (IncrementalLocalKeyedStateHandle) keyedStateHandle; restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle); restoreFromLocalState(incrementalLocalKeyedStateHandle); } else { throw new BackendBuildingException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + - ", but found " + rawStateHandle.getClass()); + "expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + + ", but found " + keyedStateHandle.getClass()); } } - private void restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHandle localKeyedStateHandle) { + private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle localKeyedStateHandle) { backendUID = localKeyedStateHandle.getBackendIdentifier(); restoredSstFiles.put( localKeyedStateHandle.getCheckpointId(), @@ -181,18 +183,10 @@ private void restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHan lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId(); } - private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle remoteKeyedStateHandle) { - backendUID = remoteKeyedStateHandle.getBackendIdentifier(); - restoredSstFiles.put( - remoteKeyedStateHandle.getCheckpointId(), - remoteKeyedStateHandle.getSharedState().keySet()); - lastCompletedCheckpointId = remoteKeyedStateHandle.getCheckpointId(); - } - - private void restoreFromRemoteState(IncrementalKeyedStateHandle stateHandle) throws Exception { + private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception { final Path tmpRestoreInstancePath = new Path( instanceBasePath.getAbsolutePath(), - UUID.randomUUID().toString()); // used as restore source for IncrementalKeyedStateHandle + UUID.randomUUID().toString()); // used as restore source for IncrementalRemoteKeyedStateHandle try { restoreFromLocalState( transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle)); @@ -227,7 +221,7 @@ private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedSt private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory( Path temporaryRestoreInstancePath, - IncrementalKeyedStateHandle restoreStateHandle) throws Exception { + IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception { try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(numberOfTransferringThreads)) { rocksDBStateDownloader.transferAllStateDataToDirectory( @@ -302,15 +296,15 @@ private void restoreWithRescaling(Collection restoreStateHandl for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + + "expected " + IncrementalRemoteKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle( - (IncrementalKeyedStateHandle) rawStateHandle, + (IncrementalRemoteKeyedStateHandle) rawStateHandle, temporaryRestoreInstancePath); RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) { @@ -351,10 +345,10 @@ private void restoreWithRescaling(Collection restoreStateHandl private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception { - assert (initialHandle instanceof IncrementalKeyedStateHandle); + assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle); // 1. Restore base DB from selected initial handle - restoreFromRemoteState((IncrementalKeyedStateHandle) initialHandle); + restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle); // 2. Clip the base DB instance try { @@ -416,7 +410,7 @@ public void close() { } private RestoredDBInstance restoreDBInstanceFromStateHandle( - IncrementalKeyedStateHandle restoreStateHandle, + IncrementalRemoteKeyedStateHandle restoreStateHandle, Path temporaryRestoreInstancePath) throws Exception { try (RocksDBStateDownloader rocksDBStateDownloader = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 3ede377661e9a..ed87024c6e1a3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -32,8 +32,8 @@ import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DirectoryStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -318,8 +318,8 @@ protected SnapshotResult callInternal() throws Exception { materializedSstFiles.put(checkpointId, sstFiles.keySet()); } - final IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = - new IncrementalKeyedStateHandle( + final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = + new IncrementalRemoteKeyedStateHandle( backendUID, keyGroupRange, checkpointId, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index b681a877a5a27..9bb3dc03759fd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; @@ -495,7 +495,7 @@ public void testSharedIncrementalStateDeRegistration() throws Exception { ValueState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - Queue previousStateHandles = new LinkedList<>(); + Queue previousStateHandles = new LinkedList<>(); SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { @@ -514,8 +514,8 @@ public void testSharedIncrementalStateDeRegistration() throws Exception { SnapshotResult snapshotResult = snapshot.get(); - IncrementalKeyedStateHandle stateHandle = - (IncrementalKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot(); + IncrementalRemoteKeyedStateHandle stateHandle = + (IncrementalRemoteKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot(); Map sharedState = new HashMap<>(stateHandle.getSharedState()); @@ -551,7 +551,7 @@ public void testSharedIncrementalStateDeRegistration() throws Exception { } } - private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception { + private void checkRemove(IncrementalRemoteKeyedStateHandle remove, SharedStateRegistry registry) throws Exception { for (StateHandleID id : remove.getSharedState().keySet()) { verify(registry, times(0)).unregisterReference( remove.createSharedStateRegistryKeyFromFileName(id)); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index f99beca3b408e..76372b7fc17bb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -21,7 +21,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; @@ -79,8 +79,8 @@ public long getStateSize() { Map stateHandles = new HashMap<>(1); stateHandles.put(new StateHandleID("state1"), stateHandle); - IncrementalKeyedStateHandle incrementalKeyedStateHandle = - new IncrementalKeyedStateHandle( + IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = + new IncrementalRemoteKeyedStateHandle( UUID.randomUUID(), KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 1, @@ -124,8 +124,8 @@ public void testMultiThreadRestoreCorrectly() throws Exception { privateStates.put(new StateHandleID(String.format("privateState%d", i)), handles.get(i)); } - IncrementalKeyedStateHandle incrementalKeyedStateHandle = - new IncrementalKeyedStateHandle( + IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = + new IncrementalRemoteKeyedStateHandle( UUID.randomUUID(), KeyGroupRange.of(0, 1), 1,