Skip to content

Commit

Permalink
[FLINK-32079][checkpoint] Support to read/write checkpoint metadata o…
Browse files Browse the repository at this point in the history
…f merged files (apache#24480)
  • Loading branch information
masteryhx committed Apr 7, 2024
1 parent 739bd33 commit ea4e498
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InputChannelStateHandle;
Expand All @@ -47,12 +48,18 @@
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;

Expand All @@ -65,6 +72,7 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,6 +132,15 @@ public abstract class MetadataV2V3SerializerBase {
// CHANGELOG_HANDLE_V2 is introduced to add new field of checkpointId.
private static final byte CHANGELOG_HANDLE_V2 = 14;

// SEGMENT_FILE_HANDLE is introduced to support file merging.
private static final byte SEGMENT_FILE_HANDLE = 15;

// EMPTY_SEGMENT_FILE_HANDLE is introduced as a placeholder for file merging.
private static final byte EMPTY_SEGMENT_FILE_HANDLE = 16;

// SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE is introduced for file merging of operator state.
private static final byte SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE = 17;

// ------------------------------------------------------------------------
// (De)serialization entry points
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -582,7 +599,10 @@ private static IncrementalRemoteKeyedStateHandle deserializeIncrementalStateHand
void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos)
throws IOException {
if (stateHandle != null) {
dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
dos.writeByte(
stateHandle instanceof FileMergingOperatorStreamStateHandle
? SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE
: PARTITIONABLE_OPERATOR_STATE_HANDLE);
Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
stateHandle.getStateNameToPartitionOffsets();
dos.writeInt(partitionOffsetsMap.size());
Expand All @@ -601,6 +621,19 @@ void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStr
dos.writeLong(offset);
}
}
if (stateHandle instanceof FileMergingOperatorStreamStateHandle) {
dos.writeUTF(
((FileMergingOperatorStreamStateHandle) stateHandle)
.getTaskOwnedDirHandle()
.getDirectory()
.toString());
dos.writeUTF(
((FileMergingOperatorStreamStateHandle) stateHandle)
.getSharedDirHandle()
.getDirectory()
.toString());
dos.writeBoolean(stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle);
}
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
} else {
dos.writeByte(NULL_HANDLE);
Expand All @@ -613,7 +646,8 @@ OperatorStateHandle deserializeOperatorStateHandle(
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
} else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
} else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type
|| SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
int mapSize = dis.readInt();
Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap =
CollectionUtil.newHashMapWithExpectedSize(mapSize);
Expand All @@ -632,8 +666,31 @@ OperatorStateHandle deserializeOperatorStateHandle(
new OperatorStateHandle.StateMetaInfo(offsets, mode);
offsetsMap.put(key, metaInfo);
}
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
return new OperatorStreamStateHandle(offsetsMap, stateHandle);
if (SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
String taskOwnedDirPathStr = dis.readUTF();
String sharedDirPathStr = dis.readUTF();
boolean isEmpty = dis.readBoolean();
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
Preconditions.checkArgument(stateHandle instanceof SegmentFileStateHandle);
return isEmpty
? new EmptyFileMergingOperatorStreamStateHandle(
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(taskOwnedDirPathStr).toPath()),
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(sharedDirPathStr).toPath()),
offsetsMap,
stateHandle)
: new FileMergingOperatorStreamStateHandle(
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(taskOwnedDirPathStr).toPath()),
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(sharedDirPathStr).toPath()),
offsetsMap,
stateHandle);
} else {
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
return new OperatorStreamStateHandle(offsetsMap, stateHandle);
}
} else {
throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
}
Expand Down Expand Up @@ -677,6 +734,18 @@ static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutput
RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle) stateHandle;
dos.writeUTF(relativeFileStateHandle.getRelativePath());
dos.writeLong(relativeFileStateHandle.getStateSize());
} else if (stateHandle instanceof SegmentFileStateHandle) {
if (stateHandle instanceof EmptySegmentFileStateHandle) {
dos.writeByte(EMPTY_SEGMENT_FILE_HANDLE);
} else {
dos.writeByte(SEGMENT_FILE_HANDLE);
SegmentFileStateHandle segmentFileStateHandle =
(SegmentFileStateHandle) stateHandle;
dos.writeLong(segmentFileStateHandle.getStartPos());
dos.writeLong(segmentFileStateHandle.getStateSize());
dos.writeInt(segmentFileStateHandle.getScope().ordinal());
dos.writeUTF(segmentFileStateHandle.getFilePath().toString());
}
} else if (stateHandle instanceof FileStateHandle) {
dos.writeByte(FILE_STREAM_STATE_HANDLE);
FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
Expand Down Expand Up @@ -745,6 +814,14 @@ static StreamStateHandle deserializeStreamStateHandle(
new KeyGroupRangeOffsets(keyGroupRange, offsets);
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
} else if (SEGMENT_FILE_HANDLE == type) {
long startPos = dis.readLong();
long stateSize = dis.readLong();
CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()];
Path physicalFilePath = new Path(dis.readUTF());
return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope);
} else if (EMPTY_SEGMENT_FILE_HANDLE == type) {
return EmptySegmentFileStateHandle.INSTANCE;
} else {
throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ public long getCheckpointedSize() {
return getDelegateStateHandle().getStateSize();
}

public DirectoryStreamStateHandle getSharedDirHandle() {
return sharedDirHandle;
}

public DirectoryStreamStateHandle getTaskOwnedDirHandle() {
return taskOwnedDirHandle;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
* {@link FileStateHandle} for state that was written to a file segment. A {@link
* SegmentFileStateHandle} represents a {@link LogicalFile}, which has already been written to a
* segment in a physical file.
*
* <p>TODO (FLINK-32079): serialization and deserialization of {@link SegmentFileStateHandle}.
*/
public class SegmentFileStateHandle implements StreamStateHandle {

Expand Down Expand Up @@ -133,7 +131,7 @@ public boolean equals(Object o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
if (!(o instanceof SegmentFileStateHandle)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DiscardRecordedStateObject;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand All @@ -38,11 +40,16 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingRelativeFileStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -140,36 +147,13 @@ private static void randomlySetSubtaskState(
boolean isIncremental = random.nextInt(3) == 0;

for (int subtaskIdx : subtasksToSet) {
StreamStateHandle operatorStateBackend =
new ByteStreamStateHandle(
"b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
StreamStateHandle operatorStateStream =
new ByteStreamStateHandle(
"b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));

Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
offsetsMap.put(
"A",
new OperatorStateHandle.StateMetaInfo(
new long[] {0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put(
"B",
new OperatorStateHandle.StateMetaInfo(
new long[] {30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put(
"C",
new OperatorStateHandle.StateMetaInfo(
new long[] {60, 70, 80}, OperatorStateHandle.Mode.UNION));

final OperatorSubtaskState.Builder state = OperatorSubtaskState.builder();
if (hasOperatorStateBackend) {
state.setManagedOperatorState(
new OperatorStreamStateHandle(offsetsMap, operatorStateBackend));
state.setManagedOperatorState(createDummyOperatorStreamStateHandle(random));
}

if (hasOperatorStateStream) {
state.setRawOperatorState(
new OperatorStreamStateHandle(offsetsMap, operatorStateStream));
state.setRawOperatorState(createDummyOperatorStreamStateHandle(random));
}

if (hasKeyedBackend) {
Expand Down Expand Up @@ -209,6 +193,45 @@ private static void randomlySetSubtaskState(
}
}

private static OperatorStreamStateHandle createDummyOperatorStreamStateHandle(Random rnd) {
Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
offsetsMap.put(
"A",
new OperatorStateHandle.StateMetaInfo(
new long[] {0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put(
"B",
new OperatorStateHandle.StateMetaInfo(
new long[] {30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put(
"C",
new OperatorStateHandle.StateMetaInfo(
new long[] {60, 70, 80}, OperatorStateHandle.Mode.UNION));

boolean enableFileMerging = rnd.nextBoolean();
if (enableFileMerging) {
DirectoryStreamStateHandle taskOwnedDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(String.valueOf(createRandomUUID(rnd))).toPath());
DirectoryStreamStateHandle sharedDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(String.valueOf(createRandomUUID(rnd))).toPath());
return rnd.nextBoolean()
? new FileMergingOperatorStreamStateHandle(
taskOwnedDirHandle,
sharedDirHandle,
offsetsMap,
createDummySegmentFileStateHandle(rnd, false))
: EmptyFileMergingOperatorStreamStateHandle.create(
taskOwnedDirHandle, sharedDirHandle);
} else {
StreamStateHandle operatorStateStream =
new ByteStreamStateHandle(
"b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
return new OperatorStreamStateHandle(offsetsMap, operatorStateStream);
}
}

private static boolean isSavepoint(String basePath) {
return basePath != null;
}
Expand Down Expand Up @@ -263,11 +286,15 @@ public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedState
}

public static List<HandleAndLocalPath> createRandomHandleAndLocalPathList(Random rnd) {
boolean enableFileMerging = rnd.nextBoolean();
final int size = rnd.nextInt(4);
List<HandleAndLocalPath> result = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
String localPath = createRandomUUID(rnd).toString();
StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, null);
StreamStateHandle stateHandle =
enableFileMerging
? createDummySegmentFileStateHandle(rnd)
: createDummyStreamStateHandle(rnd, null);
result.add(HandleAndLocalPath.of(stateHandle, localPath));
}

Expand Down Expand Up @@ -308,6 +335,55 @@ public static StreamStateHandle createDummyStreamStateHandle(
}
}

private static StreamStateHandle createDummySegmentFileStateHandle(Random rnd) {
return createDummySegmentFileStateHandle(rnd, rnd.nextBoolean());
}

private static StreamStateHandle createDummySegmentFileStateHandle(
Random rnd, boolean isEmpty) {
return isEmpty
? TestingSegmentFileStateHandle.EMPTY_INSTANCE
: new TestingSegmentFileStateHandle(
new Path(String.valueOf(createRandomUUID(rnd))),
0,
1,
CheckpointedStateScope.SHARED);
}

private static class TestingSegmentFileStateHandle extends SegmentFileStateHandle
implements DiscardRecordedStateObject {

private static final long serialVersionUID = 1L;

private static final TestingSegmentFileStateHandle EMPTY_INSTANCE =
new TestingSegmentFileStateHandle(
new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE);

private boolean disposed;

public TestingSegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
}

@Override
public void collectSizeStats(StateObjectSizeStatsCollector collector) {
// Collect to LOCAL_MEMORY for test
collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize());
}

@Override
public void discardState() {
super.discardState();
disposed = true;
}

@Override
public boolean isDisposed() {
return disposed;
}
}

private static UUID createRandomUUID(Random rnd) {
return new UUID(rnd.nextLong(), rnd.nextLong());
}
Expand Down

0 comments on commit ea4e498

Please sign in to comment.