Skip to content

Commit

Permalink
[FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWai…
Browse files Browse the repository at this point in the history
…tOperators

This PR addresses the problem by changing the Emitter's behaviour to first output the
element before removing it from the StreamElementQueue. That way the close method waits
until also the Emitter has outputted the last completed element. Additionally, the
stopResources method now frees the checkpoint lock in order to let the emitter thread
react to the interrupt signal.

This closes apache#3209.
  • Loading branch information
tillrohrmann committed Jan 25, 2017
1 parent cf9f4c7 commit a811545
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class AsyncWaitOperator<IN, OUT>
/** Timeout for the async collectors */
private final long timeout;

private transient Object checkpointingLock;
protected transient Object checkpointingLock;

/** {@link TypeSerializer} for inputs while making snapshots. */
private transient StreamElementSerializer<IN> inStreamElementSerializer;
Expand Down Expand Up @@ -189,7 +189,7 @@ else if (element.isLatencyMarker()) {
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

// start the emitter thread
this.emitterThread = new Thread(emitter);
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
emitterThread.setDaemon(true);
emitterThread.start();

Expand Down Expand Up @@ -356,6 +356,16 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
Thread.currentThread().interrupt();
}

/**
* FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
* that the emitter thread can complete/react to the interrupt signal.
*/
if (Thread.holdsLock(checkpointingLock)) {
while (emitterThread.isAlive()) {
checkpointingLock.wait(100L);
}
}

emitterThread.join();
} else {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void run() {
} else {
// Thread got interrupted which means that it should shut down
LOG.debug("Emitter thread got interrupted. This indicates that the emitter should " +
"shut down.");
"shut down.", e);
}
} catch (Throwable t) {
operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
Expand All @@ -100,18 +100,18 @@ public void run() {
private void output(AsyncResult asyncResult) throws InterruptedException {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();

AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
Expand All @@ -123,14 +123,6 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
}

synchronized (checkpointLock) {
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();

LOG.debug("Output async stream element collection result.");

try {
Expand All @@ -146,6 +138,14 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) th

return true;
} else {
LOG.debug("Failed to put element into ordered stream element queue because it " +
LOG.debug("Failed to put element into unordered stream element queue because it " +
"was full ({}/{}).", numberEntries, capacity);

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,30 @@
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.ArrayDeque;
import java.util.Collections;
Expand All @@ -73,7 +80,13 @@
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -670,4 +683,122 @@ public void testAsyncTimeout() throws Exception {
Assert.assertNotNull(failureCause.getCause().getCause());
Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
}

/**
* Test case for FLINK-5638: Tests that the async wait operator can be closed even if the
* emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
* wait operators where the latter operator's queue is currently full).
*
* Note that this test does not enforce the exact strict ordering because with the fix it is no
* longer possible. However, it provokes the described situation without the fix.
*/
@Test(timeout = 10000L)
public void testClosingWithBlockedEmitter() throws Exception {
final Object lock = new Object();

ArgumentCaptor<Throwable> failureReason = ArgumentCaptor.forClass(Throwable.class);

Environment environment = mock(Environment.class);
when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
when(environment.getTaskInfo()).thenReturn(new TaskInfo(
"testTask",
1,
0,
1,
0));
doNothing().when(environment).failExternally(failureReason.capture());

StreamTask<?, ?> containingTask = mock(StreamTask.class);
when(containingTask.getEnvironment()).thenReturn(environment);
when(containingTask.getCheckpointLock()).thenReturn(lock);
when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService());

StreamConfig streamConfig = mock(StreamConfig.class);
doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));

final OneShotLatch closingLatch = new OneShotLatch();
final OneShotLatch outputLatch = new OneShotLatch();

Output<StreamRecord<Integer>> output = mock(Output.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
assertTrue("Output should happen under the checkpoint lock.", Thread.currentThread().holdsLock(lock));

outputLatch.trigger();

// wait until we're in the closing method of the operator
while (!closingLatch.isTriggered()) {
lock.wait();
}

return null;
}
}).when(output).collect(any(StreamRecord.class));

AsyncWaitOperator<Integer, Integer> operator = new TestAsyncWaitOperator<>(
new MyAsyncFunction(),
1000L,
1,
AsyncDataStream.OutputMode.ORDERED,
closingLatch);

operator.setup(
containingTask,
streamConfig,
output);

operator.open();

synchronized (lock) {
operator.processElement(new StreamRecord<>(42));
}

outputLatch.await();

synchronized (lock) {
operator.close();
}

// check that no concurrent exception has occurred
try {
verify(environment, never()).failExternally(any(Throwable.class));
} catch (Error e) {
// add the exception occurring in the emitter thread (root cause) as a suppressed
// exception
e.addSuppressed(failureReason.getValue());
throw e;
}
}

/**
* Testing async wait operator which introduces a latch to synchronize the execution with the
* emitter.
*/
private static final class TestAsyncWaitOperator<IN, OUT> extends AsyncWaitOperator<IN, OUT> {

private static final long serialVersionUID = -8528791694746625560L;

private final transient OneShotLatch closingLatch;

public TestAsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode,
OneShotLatch closingLatch) {
super(asyncFunction, timeout, capacity, outputMode);

this.closingLatch = Preconditions.checkNotNull(closingLatch);
}

@Override
public void close() throws Exception {
closingLatch.trigger();
checkpointingLock.notifyAll();
super.close();
}
}
}

0 comments on commit a811545

Please sign in to comment.