Skip to content

Commit

Permalink
[hotfix][runtime,tests] Add test coverage for BlockingCallMonitoringT…
Browse files Browse the repository at this point in the history
…hreadPool
  • Loading branch information
pnowojski committed Apr 29, 2019
1 parent 9aeb4e5 commit bf7c1b7
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.annotation.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -44,6 +47,10 @@ public class BlockingCallMonitoringThreadPool {

private final ThreadPoolExecutor executor;

public BlockingCallMonitoringThreadPool() {
this(Executors.defaultThreadFactory());
}

public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) {
this.executor = new ThreadPoolExecutor(
1,
Expand All @@ -54,22 +61,22 @@ public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFact
checkNotNull(dispatcherThreadFactory));
}

public void submit(final Runnable runnable, final boolean blocking) {
public CompletableFuture<?> submit(final Runnable runnable, final boolean blocking) {
if (blocking) {
submitBlocking(runnable);
return submitBlocking(runnable);
} else {
submit(runnable);
return submit(runnable);
}
}

private void submit(final Runnable task) {
private CompletableFuture<?> submit(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.get());
executor.execute(task);
return CompletableFuture.runAsync(task, executor);
}

private void submitBlocking(final Runnable task) {
private CompletableFuture<?> submitBlocking(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
CompletableFuture.runAsync(task, executor).whenComplete(
return CompletableFuture.runAsync(task, executor).whenComplete(
(ignored, e) -> inFlightBlockingCallCounter.decrementAndGet());
}

Expand Down Expand Up @@ -107,4 +114,14 @@ public boolean isShutdown() {
public void shutdownNow() {
executor.shutdownNow();
}

@VisibleForTesting
int getMaximumPoolSize() {
return executor.getMaximumPoolSize();
}

@VisibleForTesting
int getQueueSize() {
return executor.getQueue().size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.core.testutils.OneShotLatch;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for {@link BlockingCallMonitoringThreadPool}.
*/
public class BlockingCallMonitoringThreadPoolTest {

private final static int TIME_OUT = 30;

private final OneShotLatch latch1 = new OneShotLatch();
private final OneShotLatch latch2 = new OneShotLatch();
private BlockingCallMonitoringThreadPool blockingCallThreadPool = new BlockingCallMonitoringThreadPool();

@Before
public void setup() {
blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
latch1.reset();
latch2.reset();
}

@After
public void tearDown() {
latch1.trigger();
latch2.trigger();
blockingCallThreadPool.shutdown();
}

@Test
public void testSubmitNonBlockingCalls() throws Exception {
blockingCallThreadPool.submit(() -> await(latch1), false);
blockingCallThreadPool.submit(() -> await(latch2), false);

assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
assertEquals(1, blockingCallThreadPool.getQueueSize());
}

@Test
public void testSubmitBlockingCall() throws Exception {
CompletableFuture<?> latch1Future = blockingCallThreadPool.submit(() -> await(latch1), true);
CompletableFuture<?> latch2Future = blockingCallThreadPool.submit(() -> await(latch2), false);

assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
assertEquals(0, blockingCallThreadPool.getQueueSize());

latch2.trigger();
latch2Future.get(TIME_OUT, TimeUnit.SECONDS);

assertFalse(latch1Future.isDone());
assertTrue(latch2Future.isDone());
}

@Test
public void testDownsizePool() throws Exception {
List<CompletableFuture<?>> futures = new ArrayList<>();

futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
futures.add(blockingCallThreadPool.submit(() -> await(latch1), false));

assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());

latch1.trigger();

for (CompletableFuture<?> future : futures) {
future.get(TIME_OUT, TimeUnit.SECONDS);
}

blockingCallThreadPool.submit(() -> await(latch1), false).get(TIME_OUT, TimeUnit.SECONDS);
assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
}

private void await(OneShotLatch latch) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit bf7c1b7

Please sign in to comment.