Skip to content

Commit

Permalink
[FLINK-5240][tests] ensure state backends are properly closed
Browse files Browse the repository at this point in the history
This adds additional test cases to verify the state backends are closed
properly upon the end of a task. The state backends should always be
closed regardless of the final state of the task.

This closes apache#2997.
  • Loading branch information
mxm committed Dec 14, 2016
1 parent 8038ae4 commit bf2874e
Showing 1 changed file with 111 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -47,7 +49,10 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
Expand All @@ -61,13 +66,17 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;

import org.junit.Test;

import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
Expand Down Expand Up @@ -144,23 +153,53 @@ public void testEarlyCanceling() throws Exception {
}

@Test
public void testStateBackendLoading() throws Exception {
public void testStateBackendLoadingAndClosing() throws Exception {
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);

StateBackendTestSource.fail = false;
task.startTaskThread();

// wait for clean termination
task.getExecutingThread().join();

// ensure that the state backends are closed
Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
Mockito.verify(StateBackendTestSource.keyedStateBackend).close();

assertEquals(ExecutionState.FINISHED, task.getExecutionState());
}

@Test
public void testStateBackendClosingOnFailure() throws Exception {
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);

StateBackendTestSource.fail = true;
task.startTaskThread();

// wait for clean termination
task.getExecutingThread().join();

// ensure that the state backends are closed
Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
Mockito.verify(StateBackendTestSource.keyedStateBackend).close();

assertEquals(ExecutionState.FAILED, task.getExecutionState());
}

@Test
public void testCancellationNotBlockedOnLock() throws Exception {
SYNC_LATCH = new OneShotLatch();
Expand Down Expand Up @@ -358,18 +397,86 @@ public void run(SourceContext<Long> ctx) {}
public void cancel() {}
}

/**
* Mocked state backend factory which returns mocks for the operator and keyed state backends.
*/
public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> {
private static final long serialVersionUID = 1L;

@Override
public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
return mock(AbstractStateBackend.class);
AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class);

Mockito.when(stateBackendMock.createOperatorStateBackend(
Mockito.any(Environment.class),
Mockito.any(String.class)))
.thenAnswer(new Answer<OperatorStateBackend>() {
@Override
public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
return Mockito.mock(OperatorStateBackend.class);
}
});

Mockito.when(stateBackendMock.createKeyedStateBackend(
Mockito.any(Environment.class),
Mockito.any(JobID.class),
Mockito.any(String.class),
Mockito.any(TypeSerializer.class),
Mockito.any(int.class),
Mockito.any(KeyGroupRange.class),
Mockito.any(TaskKvStateRegistry.class)))
.thenAnswer(new Answer<AbstractKeyedStateBackend>() {
@Override
public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
return Mockito.mock(AbstractKeyedStateBackend.class);
}
});

return stateBackendMock;
}
}

// ------------------------------------------------------------------------
// ------------------------------------------------------------------------

/**
* Source that instantiates the operator state backend and the keyed state backend.
* The created state backends can be retrieved from the static fields to check if the
* CloseableRegistry closed them correctly.
*/
public static class StateBackendTestSource extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {

private static volatile boolean fail;

private static volatile OperatorStateBackend operatorStateBackend;
private static volatile AbstractKeyedStateBackend keyedStateBackend;

@Override
protected void init() throws Exception {
operatorStateBackend = createOperatorStateBackend(
Mockito.mock(StreamOperator.class),
null);
keyedStateBackend = createKeyedStateBackend(
Mockito.mock(TypeSerializer.class),
4,
Mockito.mock(KeyGroupRange.class));
}

@Override
protected void run() throws Exception {
if (fail) {
throw new RuntimeException();
}
}

@Override
protected void cleanup() throws Exception {}

@Override
protected void cancelTask() throws Exception {}

}

/**
* A task that locks if cancellation attempts to cleanly shut down
*/
Expand Down Expand Up @@ -438,7 +545,7 @@ protected void run() throws Exception {

// we are at the point where cancelling can happen
SYNC_LATCH.trigger();

// try to acquire the lock - this is not possible as long as the lock holder
// thread lives
//noinspection SynchronizationOnLocalVariableOrMethodParameter
Expand Down

0 comments on commit bf2874e

Please sign in to comment.