Skip to content

Commit

Permalink
[FLINK-29457][state-processor] Add UID (hash) remapping fuction
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 11, 2022
1 parent 6934032 commit 8ccca78
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/content.zh/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,25 @@ SavepointWriter
.withOperator(OperatorIdentifier.forUid("uid"), transformation)
.write(newPath);
```

### Changing UID (hashes)

`SavepointWriter#changeOperatorIdenfifier` can be used to modify the [UIDs]({{< ref "docs/concepts/glossary" >}}#uid) or [UID hash]({{< ref "docs/concepts/glossary" >}}#uid-hash) of an operator.

If a `UID` was not explicitly set (and was thus auto-generated and is effectively unknown), you can assign a `UID` provided that you know the `UID hash` (e.g., by parsing the logs):
```java
savepointWriter
.changeOperatorIdentifier(
OperatorIdentifier.forUidHash("2feb7f8bcc404c3ac8a981959780bd78"),
OperatorIdentifier.forUid("new-uid"))
...
```

You can also replace one `UID` with another:
```java
savepointWriter
.changeOperatorIdentifier(
OperatorIdentifier.forUid("old-uid"),
OperatorIdentifier.forUid("new-uid"))
...
```
22 changes: 22 additions & 0 deletions docs/content/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,25 @@ SavepointWriter
.withOperator(OperatorIdentifier.forUid("uid"), transformation)
.write(newPath);
```

### Changing UID (hashes)

`SavepointWriter#changeOperatorIdenfifier` can be used to modify the [UIDs]({{< ref "docs/concepts/glossary" >}}#uid) or [UID hash]({{< ref "docs/concepts/glossary" >}}#uid-hash) of an operator.

If a `UID` was not explicitly set (and was thus auto-generated and is effectively unknown), you can assign a `UID` provided that you know the `UID hash` (e.g., by parsing the logs):
```java
savepointWriter
.changeOperatorIdentifier(
OperatorIdentifier.forUidHash("2feb7f8bcc404c3ac8a981959780bd78"),
OperatorIdentifier.forUid("new-uid"))
...
```

You can also replace one `UID` with another:
```java
savepointWriter
.changeOperatorIdentifier(
OperatorIdentifier.forUid("old-uid"),
OperatorIdentifier.forUid("new-uid"))
...
```
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(operatorId);
}

@Override
public String toString() {
return uid != null ? uid + "(" + operatorId.toHexString() + ")" : operatorId.toHexString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.state.api;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand All @@ -36,14 +37,18 @@
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;

Expand All @@ -54,6 +59,9 @@
@PublicEvolving
public class SavepointWriter {

private final Map<OperatorIdentifier, OperatorIdentifier> uidTransformationMap =
new HashMap<>();

/**
* Loads an existing savepoint. Useful if you want to modify or extend the state of an existing
* application. The savepoint will be written using the state backend defined via the clusters
Expand Down Expand Up @@ -223,6 +231,51 @@ public <T> SavepointWriter withConfiguration(ConfigOption<T> option, T value) {
return this;
}

/**
* Changes the identifier of an operator.
*
* <p>This method is comparatively cheap since it only modifies savepoint metadata without
* reading the entire savepoint data.
*
* <p>Use-cases include, but are not limited to:
*
* <ul>
* <li>assigning a UID to an operator that did not have a UID assigned before
* <li>changing the UID of an operator
* <li>swapping the states of 2 operators
* </ul>
*
* <p>Identifier changes are applied after all other operations; in the following example the
* savepoint will only contain UID_2.
*
* <pre>
* SavepointWriter savepoint = ...
* savepoint.withOperator(UID_1, ...)
* savepoint.changeOperatorIdentifier(UID_1, UID_2)
* savepoint.write(...)
* </pre>
*
* <p>You cannot define a chain of changes; in the following example the savepoint will only
* contain UID_2.
*
* <pre>
* SavepointWriter savepoint = ...
* savepoint.withOperator(UID_1, ...)
* savepoint.changeOperatorIdentifier(UID_1, UID_2)
* savepoint.changeOperatorIdentifier(UID_2, UID_3)
* savepoint.write(...)
* </pre>
*
* @param from operator whose identifier should be changed
* @param to desired identifier
* @return The modified savepoint.
*/
public SavepointWriter changeOperatorIdentifier(
OperatorIdentifier from, OperatorIdentifier to) {
this.uidTransformationMap.put(from, to);
return this;
}

/**
* Write out a new or updated savepoint.
*
Expand Down Expand Up @@ -262,6 +315,8 @@ public final void write(String path) {
new GroupReduceOperator<>(
new MergeOperatorStates(metadata.getMasterStates())))
.forceNonParallel()
.map(new CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap))
.setParallelism(1)
.addSink(new OutputFormatSinkFunction<>(new SavepointOutputFormat(savepointPath)))
.setParallelism(1)
.name(path);
Expand All @@ -288,4 +343,51 @@ private DataStream<OperatorState> writeOperatorStates(
new IllegalStateException(
"Savepoint must contain at least one operator"));
}

private static class CheckpointMetadataCheckpointMetadataMapFunction
extends RichMapFunction<CheckpointMetadata, CheckpointMetadata> {
private static final long serialVersionUID = 1L;

private final Map<OperatorIdentifier, OperatorIdentifier> uidTransformationMap;

public CheckpointMetadataCheckpointMetadataMapFunction(
Map<OperatorIdentifier, OperatorIdentifier> uidTransformationMap) {
this.uidTransformationMap = new HashMap<>(uidTransformationMap);
}

@Override
public CheckpointMetadata map(CheckpointMetadata value) throws Exception {
final List<OperatorState> mapped =
value.getOperatorStates().stream()
.map(
operatorState -> {
OperatorIdentifier operatorIdentifier =
OperatorIdentifier.forUidHash(
operatorState
.getOperatorID()
.toHexString());

final OperatorIdentifier transformedIdentifier =
uidTransformationMap.remove(operatorIdentifier);
if (transformedIdentifier != null) {
return operatorState.copyWithNewOperatorID(
transformedIdentifier.getOperatorId());
}
return operatorState;
})
.collect(Collectors.toList());
return new CheckpointMetadata(value.getCheckpointId(), mapped, value.getMasterStates());
}

@Override
public void close() throws Exception {
if (!uidTransformationMap.isEmpty()) {
throw new FlinkRuntimeException(
"Some identifier changes were never applied!"
+ uidTransformationMap.entrySet().stream()
.map(Map.Entry::toString)
.collect(Collectors.joining("\n\t", "\n\t", "")));
}
}
}
}
Loading

0 comments on commit 8ccca78

Please sign in to comment.