Skip to content

Commit

Permalink
[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Va…
Browse files Browse the repository at this point in the history
…lue Behaviour
  • Loading branch information
aljoscha committed Nov 28, 2016
1 parent fe6b835 commit 60a4ab3
Showing 1 changed file with 137 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RunnableFuture;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -658,6 +654,141 @@ public void testFoldingState() {
}
}

/**
* Verify that {@link ValueStateDescriptor} allows {@code null} as default.
*/
@Test
public void testValueStateNullAsDefaultValue() throws Exception {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());

ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

backend.setCurrentKey(1);
assertEquals(null, state.value());

state.update("Ciao");
assertEquals("Ciao", state.value());

state.clear();
assertEquals(null, state.value());

backend.dispose();
}


/**
* Verify that an empty {@code ValueState} will yield the default value.
*/
@Test
public void testValueStateDefaultValue() throws Exception {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
kvId.initializeSerializerUnlessSet(new ExecutionConfig());

ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

backend.setCurrentKey(1);
assertEquals("Hello", state.value());

state.update("Ciao");
assertEquals("Ciao", state.value());

state.clear();
assertEquals("Hello", state.value());

backend.dispose();
}

/**
* Verify that an empty {@code ReduceState} yields {@code null}.
*/
@Test
public void testReducingStateDefaultValue() throws Exception {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());

ReducingState<String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);

backend.setCurrentKey(1);
assertNull(state.get());

state.add("Ciao");
assertEquals("Ciao", state.get());

state.clear();
assertNull(state.get());

backend.dispose();
}

/**
* Verify that an empty {@code FoldingState} yields {@code null}.
*/
@Test
public void testFoldingStateDefaultValue() throws Exception {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

FoldingStateDescriptor<Integer, String> kvId =
new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);

kvId.initializeSerializerUnlessSet(new ExecutionConfig());

FoldingState<Integer, String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);

backend.setCurrentKey(1);
assertNull(state.get());

state.add(1);
state.add(2);
assertEquals("Fold-Initial:,1,2", state.get());

state.clear();
assertNull(state.get());

backend.dispose();
}


/**
* Verify that an empty {@code ListState} yields {@code null}.
*/
@Test
public void testListStateDefaultValue() throws Exception {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());

ListState<String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);

backend.setCurrentKey(1);
assertNull(state.get());

state.add("Ciao");
state.add("Bello");
assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));

state.clear();
assertNull(state.get());

backend.dispose();
}




/**
* This test verifies that state is correctly assigned to key groups and that restore
* restores the relevant key groups in the backend.
Expand Down

0 comments on commit 60a4ab3

Please sign in to comment.