Skip to content

Commit

Permalink
[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter authored and uce committed Oct 6, 2016
1 parent 98710ea commit fee1430
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void doRestore(List<KeyGroupsStateHandle> keyGroupsStateHandles)
private void restoreKeyGroupsInStateHandle()
throws IOException, RocksDBException, ClassNotFoundException {
try {
currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public static void serializeKeyGroupStateHandle(
for (int keyGroup : stateHandle.keyGroups()) {
dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
}
serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
} else {
dos.writeByte(NULL_HANDLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;


import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
Expand All @@ -28,7 +29,7 @@
* consists of a range of key group snapshots. A key group is subset of the available
* key space. The key groups are identified by their key group indices.
*/
public class KeyGroupsStateHandle implements StateObject {
public class KeyGroupsStateHandle implements StreamStateHandle {

private static final long serialVersionUID = -8070326169926626355L;

Expand Down Expand Up @@ -104,14 +105,6 @@ public int getNumberOfKeyGroups() {
return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
}

/**
*
* @return the inner stream state handle to the actual key-group states
*/
public StreamStateHandle getStateHandle() {
return stateHandle;
}

@Override
public void discardState() throws Exception {
stateHandle.discardState();
Expand All @@ -122,6 +115,15 @@ public long getStateSize() throws IOException {
return stateHandle.getStateSize();
}

@Override
public FSDataInputStream openInputStream() throws IOException {
return stateHandle.openInputStream();
}

public StreamStateHandle getDelegateStateHandle() {
return stateHandle;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exc

try {

fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream();
fsDataInputStream = keyGroupsHandle.openInputStream();
cancelStreamRegistry.registerClosable(fsDataInputStream);

DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2474,7 +2474,7 @@ public static void compareKeyPartitionedState(

assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups);

try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.getStateHandle().openInputStream()) {
try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) {
for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) {
long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
inputStream.seek(offset);
Expand All @@ -2483,9 +2483,8 @@ public static void compareKeyPartitionedState(
for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.
getStateHandle().openInputStream()) {

try (FSDataInputStream actualInputStream =
oneActualKeyGroupStateHandle.openInputStream()) {
actualInputStream.seek(actualOffset);

int actualGroupState = InstantiationUtil.
Expand Down

0 comments on commit fee1430

Please sign in to comment.