Skip to content

Commit

Permalink
[hotfix][state/changelog] Rename StateChangelogHandle to ChangelogSta…
Browse files Browse the repository at this point in the history
…teHandle
  • Loading branch information
rkhachatryan committed Jun 29, 2021
1 parent fe5df06 commit 7cf3783
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
Expand Down Expand Up @@ -337,8 +337,8 @@ void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream do
serializeKeyedStateHandle(k, dos);
}

} else if (stateHandle instanceof InMemoryStateChangelogHandle) {
InMemoryStateChangelogHandle handle = (InMemoryStateChangelogHandle) stateHandle;
} else if (stateHandle instanceof InMemoryChangelogStateHandle) {
InMemoryChangelogStateHandle handle = (InMemoryChangelogStateHandle) stateHandle;
dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE);
dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
Expand All @@ -351,8 +351,8 @@ void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream do
dos.write(change.getChange());
}

} else if (stateHandle instanceof StateChangelogHandleStreamImpl) {
StateChangelogHandleStreamImpl handle = (StateChangelogHandleStreamImpl) stateHandle;
} else if (stateHandle instanceof ChangelogStateHandleStreamImpl) {
ChangelogStateHandleStreamImpl handle = (ChangelogStateHandleStreamImpl) stateHandle;
dos.writeByte(CHANGELOG_FILE_INCREMENT_HANDLE);
dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
Expand Down Expand Up @@ -438,9 +438,9 @@ KeyedStateHandle deserializeKeyedStateHandle(
base.add(deserializeKeyedStateHandle(dis, context));
}
int deltaSize = dis.readInt();
List<StateChangelogHandle> delta = new ArrayList<>(deltaSize);
List<ChangelogStateHandle> delta = new ArrayList<>(deltaSize);
for (int i = 0; i < deltaSize; i++) {
delta.add((StateChangelogHandle) deserializeKeyedStateHandle(dis, context));
delta.add((ChangelogStateHandle) deserializeKeyedStateHandle(dis, context));
}
return new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
base, delta, keyGroupRange);
Expand All @@ -460,7 +460,7 @@ KeyedStateHandle deserializeKeyedStateHandle(
checkState(bytesSize == dis.read(bytes));
changes.add(new StateChange(keyGroup, bytes));
}
return new InMemoryStateChangelogHandle(changes, from, to, keyGroupRange);
return new InMemoryChangelogStateHandle(changes, from, to, keyGroupRange);

} else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) {
int start = dis.readInt();
Expand All @@ -475,7 +475,7 @@ KeyedStateHandle deserializeKeyedStateHandle(
streamHandleAndOffset.add(Tuple2.of(h, o));
}
long size = dis.readLong();
return new StateChangelogHandleStreamImpl(streamHandleAndOffset, keyGroupRange, size);
return new ChangelogStateHandleStreamImpl(streamHandleAndOffset, keyGroupRange, size);

} else {
throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@
public interface ChangelogStateBackendHandle extends KeyedStateHandle {
List<KeyedStateHandle> getMaterializedStateHandles();

List<StateChangelogHandle> getNonMaterializedStateHandles();
List<ChangelogStateHandle> getNonMaterializedStateHandles();

class ChangelogStateBackendHandleImpl implements ChangelogStateBackendHandle {
private static final long serialVersionUID = 1L;
private final List<KeyedStateHandle> materialized;
private final List<StateChangelogHandle> nonMaterialized;
private final List<ChangelogStateHandle> nonMaterialized;
private final KeyGroupRange keyGroupRange;

public ChangelogStateBackendHandleImpl(
List<KeyedStateHandle> materialized,
List<StateChangelogHandle> nonMaterialized,
List<ChangelogStateHandle> nonMaterialized,
KeyGroupRange keyGroupRange) {
this.materialized = unmodifiableList(materialized);
this.nonMaterialized = unmodifiableList(nonMaterialized);
Expand Down Expand Up @@ -96,11 +96,11 @@ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
.map(handle -> handle.getIntersection(keyGroupRange))
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<StateChangelogHandle> deltaPart =
List<ChangelogStateHandle> deltaPart =
this.nonMaterialized.stream()
.map(
handle ->
(StateChangelogHandle)
(ChangelogStateHandle)
handle.getIntersection(keyGroupRange))
.filter(Objects::nonNull)
.collect(Collectors.toList());
Expand All @@ -119,7 +119,7 @@ public List<KeyedStateHandle> getMaterializedStateHandles() {
}

@Override
public List<StateChangelogHandle> getNonMaterializedStateHandles() {
public List<ChangelogStateHandle> getNonMaterializedStateHandles() {
return nonMaterialized;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@

/** A handle to saved {@link StateChange state changes}. */
@Internal
public interface StateChangelogHandle extends KeyedStateHandle {}
public interface ChangelogStateHandle extends KeyedStateHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.List;
import java.util.stream.Collectors;

/** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */
/** {@link ChangelogStateHandle} implementation based on {@link StreamStateHandle}. */
@Internal
public final class StateChangelogHandleStreamImpl implements StateChangelogHandle {
public final class ChangelogStateHandleStreamImpl implements ChangelogStateHandle {

private static final long serialVersionUID = -8070326169926626355L;

Expand All @@ -49,7 +49,7 @@ public final class StateChangelogHandleStreamImpl implements StateChangelogHandl
private transient SharedStateRegistry stateRegistry;
private final long size;

public StateChangelogHandleStreamImpl(
public ChangelogStateHandleStreamImpl(
List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets,
KeyGroupRange keyGroupRange,
long size) {
Expand All @@ -58,7 +58,7 @@ public StateChangelogHandleStreamImpl(
this.size = size;
}

public StateChangelogHandleStreamImpl(
public ChangelogStateHandleStreamImpl(
List<Tuple3<StreamStateHandle, Long, Long>> sorted, KeyGroupRange keyGroupRange) {
this(
sorted.stream().map(t -> Tuple2.of(t.f0, t.f1)).collect(Collectors.toList()),
Expand Down Expand Up @@ -87,7 +87,7 @@ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
if (offsets.getNumberOfKeyGroups() == 0) {
return null;
}
return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets, 0L /* unknown */);
return new ChangelogStateHandleStreamImpl(handlesAndOffsets, offsets, 0L /* unknown */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import java.io.IOException;

/** Allows to read state changelog referenced by the provided {@link StateChangelogHandle}. */
/** Allows to read state changelog referenced by the provided {@link ChangelogStateHandle}. */
@Internal
public interface StateChangelogHandleReader<Handle extends StateChangelogHandle> {
public interface StateChangelogHandleReader<Handle extends ChangelogStateHandle> {
CloseableIterator<StateChange> getChanges(Handle handle) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
import java.util.Iterator;

/**
* A reader for {@link StateChangelogHandleStreamImpl} that iterates over its underlying {@link
* A reader for {@link ChangelogStateHandleStreamImpl} that iterates over its underlying {@link
* StreamStateHandle stream handles} and offsets. Starting from each offset, it enumerates the
* {@link StateChange state changes} using the provided {@link StateChangeIterator}. Different
* {@link StateChangelogStorage} implementations may have different <b>iterator</b> implementations.
* Using a different {@link StateChangelogHandle} (and reader) is problematic as it needs to be
* Using a different {@link ChangelogStateHandle} (and reader) is problematic as it needs to be
* serialized.
*/
@Internal
public class StateChangelogHandleStreamHandleReader
implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> {
implements StateChangelogHandleReader<ChangelogStateHandleStreamImpl> {
private static final Logger LOG =
LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class);

Expand All @@ -56,7 +56,7 @@ public StateChangelogHandleStreamHandleReader(StateChangeIterator changeIterator
}

@Override
public CloseableIterator<StateChange> getChanges(StateChangelogHandleStreamImpl handle)
public CloseableIterator<StateChange> getChanges(ChangelogStateHandleStreamImpl handle)
throws IOException {
return new CloseableIterator<StateChange>() {
private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* {@link StateChangelogStorageLoader} to obtain an instance.
*/
@Internal
public interface StateChangelogStorage<Handle extends StateChangelogHandle> extends AutoCloseable {
public interface StateChangelogStorage<Handle extends ChangelogStateHandle> extends AutoCloseable {

StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange keyGroupRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** Allows to write data to the log. Scoped to a single writer (e.g. state backend). */
@Internal
public interface StateChangelogWriter<Handle extends StateChangelogHandle> extends AutoCloseable {
public interface StateChangelogWriter<Handle extends ChangelogStateHandle> extends AutoCloseable {

/** Get the initial {@link SequenceNumber} that is used for the first element. */
SequenceNumber initialSequenceNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandle;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** In-memory {@link StateChangelogHandle}. */
/** In-memory {@link ChangelogStateHandle}. */
@Internal
public class InMemoryStateChangelogHandle implements StateChangelogHandle {
public class InMemoryChangelogStateHandle implements ChangelogStateHandle {

private static final long serialVersionUID = 1L;

Expand All @@ -41,12 +41,12 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle {
private final SequenceNumber to; // for debug purposes
private final KeyGroupRange keyGroupRange;

public InMemoryStateChangelogHandle(
public InMemoryChangelogStateHandle(
List<StateChange> changes, long from, long to, KeyGroupRange keyGroupRange) {
this(changes, SequenceNumber.of(from), SequenceNumber.of(to), keyGroupRange);
}

public InMemoryStateChangelogHandle(
public InMemoryChangelogStateHandle(
List<StateChange> changes,
SequenceNumber from,
SequenceNumber to,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** An in-memory (non-production) implementation of {@link StateChangelogStorage}. */
public class InMemoryStateChangelogStorage
implements StateChangelogStorage<InMemoryStateChangelogHandle> {
implements StateChangelogStorage<InMemoryChangelogStateHandle> {

@Override
public InMemoryStateChangelogWriter createWriter(
Expand All @@ -33,7 +33,7 @@ public InMemoryStateChangelogWriter createWriter(
}

@Override
public StateChangelogHandleReader<InMemoryStateChangelogHandle> createReader() {
public StateChangelogHandleReader<InMemoryChangelogStateHandle> createReader() {
return handle -> CloseableIterator.fromList(handle.getChanges(), change -> {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;

@NotThreadSafe
class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> {
class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChangelogStateHandle> {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);

Expand Down Expand Up @@ -75,11 +75,11 @@ public SequenceNumber lastAppendedSequenceNumber() {
}

@Override
public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) {
public CompletableFuture<InMemoryChangelogStateHandle> persist(SequenceNumber from) {
LOG.debug("Persist after {}", from);
Preconditions.checkNotNull(from);
return completedFuture(
new InMemoryStateChangelogHandle(collectChanges(from), from, sqn, keyGroupRange));
new InMemoryChangelogStateHandle(collectChanges(from), from, sqn, keyGroupRange));
}

private List<StateChange> collectChanges(SequenceNumber after) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
Expand Down Expand Up @@ -49,7 +49,7 @@
import static org.junit.Assert.assertFalse;

/** {@link InMemoryStateChangelogStorage} test. */
public class StateChangelogStorageTest<T extends StateChangelogHandle> {
public class StateChangelogStorageTest<T extends ChangelogStateHandle> {

private final Random random = new Random();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
Expand Down Expand Up @@ -130,7 +130,7 @@ public class ChangelogKeyedStateBackend<K>

private final TtlTimeProvider ttlTimeProvider;

private final StateChangelogWriter<StateChangelogHandle> stateChangelogWriter;
private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter;

private long lastCheckpointId = -1L;

Expand All @@ -150,7 +150,7 @@ public class ChangelogKeyedStateBackend<K>

/** Updated initially on restore and later cleared upon materialization (in FLINK-21357). */
@GuardedBy("materialized")
private final List<StateChangelogHandle> restoredNonMaterialized = new ArrayList<>();
private final List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();

/**
* {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link
Expand Down Expand Up @@ -181,7 +181,7 @@ public ChangelogKeyedStateBackend(
AbstractKeyedStateBackend<K> keyedStateBackend,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
StateChangelogWriter<StateChangelogHandle> stateChangelogWriter,
StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState) {
this.keyedStateBackend = keyedStateBackend;
this.executionConfig = executionConfig;
Expand Down Expand Up @@ -324,13 +324,13 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
.thenApply(this::buildSnapshotResult));
}

private SnapshotResult<KeyedStateHandle> buildSnapshotResult(StateChangelogHandle delta) {
private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle delta) {
// Can be called by either task thread during the sync checkpoint phase (if persist future
// was already completed); or by the writer thread otherwise. So need to synchronize.
// todo: revisit after FLINK-21357 - use mailbox action?
synchronized (materialized) {
// collections don't change once started and handles are immutable
List<StateChangelogHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized);
List<ChangelogStateHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized);
if (delta != null && delta.getStateSize() > 0) {
prevDeltaCopy.add(delta);
}
Expand Down
Loading

0 comments on commit 7cf3783

Please sign in to comment.