Skip to content

Commit

Permalink
[refactor][state] Rename createInternalState to createOrUpdateInterna…
Browse files Browse the repository at this point in the history
…lState
  • Loading branch information
masteryhx authored and rkhachatryan committed Aug 8, 2022
1 parent 94411a1 commit a267dcf
Show file tree
Hide file tree
Showing 18 changed files with 55 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testListSerialization() throws Exception {
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<Long, VoidNamespace, Long> listState =
longHeapKeyedStateBackend.createInternalState(
longHeapKeyedStateBackend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testListSerialization() throws Exception {
getLongHeapKeyedStateBackend(key);

final InternalListState<Long, VoidNamespace, Long> listState =
longHeapKeyedStateBackend.createInternalState(
longHeapKeyedStateBackend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public interface KeyedStateFactory {

/**
* Creates and returns a new {@link InternalKvState}.
* Creates or updates internal state and returns a new {@link InternalKvState}.
*
* @param namespaceSerializer TypeSerializer for the state namespace.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
Expand All @@ -40,16 +40,16 @@ public interface KeyedStateFactory {
* @param <IS> The type of internal state.
*/
@Nonnull
default <N, SV, S extends State, IS extends S> IS createInternalState(
default <N, SV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc)
throws Exception {
return createInternalState(
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, StateSnapshotTransformFactory.noTransform());
}

/**
* Creates and returns a new {@link InternalKvState}.
* Creates or updates internal state and returns a new {@link InternalKvState}.
*
* @param namespaceSerializer TypeSerializer for the state namespace.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
Expand All @@ -61,7 +61,7 @@ default <N, SV, S extends State, IS extends S> IS createInternalState(
* @param <IS> The type of internal state.
*/
@Nonnull
<N, SV, SEV, S extends State, IS extends S> IS createInternalState(
<N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Expand All @@ -81,7 +81,7 @@ <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
* @param <IS> The type of internal state.
*/
@Nonnull
default <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
default <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
Expand All @@ -91,7 +91,8 @@ default <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
throw new UnsupportedOperationException(
this.getClass().getName() + "doesn't support to allow future metadata update");
} else {
return createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,18 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {

@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
throws Exception {
return createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory, false);
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory, false);
}

@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ IS createStateAndWrapWithTtlIfEnabled(
? new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
namespaceSerializer, stateDesc, stateBackend, timeProvider)
.createState()
: stateBackend.createInternalState(namespaceSerializer, stateDesc);
: stateBackend.createOrUpdateInternalState(namespaceSerializer, stateDesc);
}

private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories;
Expand Down Expand Up @@ -222,7 +222,7 @@ TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDes
// config
OIS originalState =
(OIS)
stateBackend.createInternalState(
stateBackend.createOrUpdateInternalState(
namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
return new TtlStateContext<>(
originalState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4647,7 +4647,7 @@ public void testParallelAsyncSnapshots() throws Exception {

// insert some data to the backend.
InternalValueState<Integer, VoidNamespace, Integer> valueState =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));

Expand Down Expand Up @@ -4734,7 +4734,7 @@ public void testAsyncSnapshot() throws Exception {
try {
backend = createKeyedBackend(IntSerializer.INSTANCE);
InternalValueState<Integer, VoidNamespace, Integer> valueState =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));

Expand Down Expand Up @@ -4786,7 +4786,7 @@ public void testAsyncSnapshot() throws Exception {
backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);

InternalValueState<Integer, VoidNamespace, Integer> valueState =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));

Expand Down Expand Up @@ -4958,7 +4958,7 @@ public void testAsyncSnapshotCancellation() throws Exception {
}

InternalValueState<Integer, VoidNamespace, Integer> valueState =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));

Expand Down Expand Up @@ -5082,7 +5082,7 @@ public void testMapStateGetKeysAndNamespaces() throws Exception {
createKeyedBackend(IntSerializer.INSTANCE);
try {
InternalMapState<Integer, String, String, Integer> internalState =
backend.createInternalState(
backend.createOrUpdateInternalState(
StringSerializer.INSTANCE,
new MapStateDescriptor<>(
fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {

@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
return delegatedKeyedStateBackend.createInternalState(
return delegatedKeyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
try {

InternalValueState<String, VoidNamespace, String> state =
stateBackend.createInternalState(
stateBackend.createOrUpdateInternalState(
new VoidNamespaceSerializer(), stateDescriptor);

stateBackend.setCurrentKey("A");
Expand Down Expand Up @@ -172,7 +172,7 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
executionConfig, StateObjectCollection.singleton(stateHandle));
try {
InternalValueState<String, VoidNamespace, String> state =
stateBackend.createInternalState(
stateBackend.createOrUpdateInternalState(
new VoidNamespaceSerializer(), stateDescriptor);

stateBackend.setCurrentKey("A");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private class TestValueState extends TestState {

private TestValueState() throws Exception {
this.state =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("TestValueState", StringSerializer.INSTANCE),
snapshotTransformFactory);
Expand All @@ -124,7 +124,7 @@ private class TestListState extends TestState {

private TestListState() throws Exception {
this.state =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>(
"TestListState",
Expand All @@ -148,7 +148,7 @@ private class TestMapState extends TestState {

private TestMapState() throws Exception {
this.state =
backend.createInternalState(
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>(
"TestMapState",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());

InternalMapState<String, Integer, Long, Long> state =
keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.createOrUpdateInternalState(IntSerializer.INSTANCE, stateDescr);

keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ <N, SV, S extends State, IS extends S> IS createInternalState(
@Override
@SuppressWarnings("unchecked")
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,15 @@ public <N, S extends State, T> S getOrCreateKeyedState(
@Nonnull
@Override
@SuppressWarnings("unchecked")
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
InternalKvState<K, N, SV> state =
keyedStateBackend.createInternalState(
keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
ChangelogState changelogState =
changelogStateFactory.getExistingState(
Expand Down Expand Up @@ -852,7 +852,7 @@ public <N, S extends State, V> S createKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
throws Exception {
InternalKvState<K, N, V> kvState =
keyedStateBackend.createInternalState(
keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDescriptor, noTransform(), true);
ChangelogState changelogState =
changelogStateFactory.getExistingState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public <N, S extends State, V> S createKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
throws Exception {
InternalKvState<K, N, V> kvState =
keyedStateBackend.createInternalState(
keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDescriptor, noTransform(), true);
ChangelogState changelogState =
changelogStateFactory.getExistingState(
Expand Down Expand Up @@ -232,14 +232,14 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {

@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
return keyedStateBackend.createInternalState(
return keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,17 +862,18 @@ private static <UK> boolean checkMapStateKeySchemaCompatibility(

@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
throws Exception {
return createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory, false);
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory, false);
}

@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,10 @@ public void testCorrectMergeOperatorSet() throws Exception {

ValueStateDescriptor<String> stubState1 =
new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE);
test.createInternalState(StringSerializer.INSTANCE, stubState1);
test.createOrUpdateInternalState(StringSerializer.INSTANCE, stubState1);
ValueStateDescriptor<String> stubState2 =
new ValueStateDescriptor<>("StubState-2", StringSerializer.INSTANCE);
test.createInternalState(StringSerializer.INSTANCE, stubState2);
test.createOrUpdateInternalState(StringSerializer.INSTANCE, stubState2);

// The default CF is pre-created so sum up to 2 times (once for each stub state)
verify(columnFamilyOptions, Mockito.times(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener)

@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {

@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
Expand Down
Loading

0 comments on commit a267dcf

Please sign in to comment.