Skip to content

Commit

Permalink
Fixed flaky test and resource leak in JavaInstanceTest (apache#7494)
Browse files Browse the repository at this point in the history
### Motivation

The async function test is relying on a race condition on when the counter is updated. 

With `pendingQueueSize=2`, the 3rd event is blocked on the queue until the 1st event is processed. That makes 1st and 2nd event to be triggered around the same time and the validation to fail, when we the 2nd event to be already done.

Additionally, there are multiple threads leaked by this test.
  • Loading branch information
merlimat authored Jul 10, 2020
1 parent 33edec3 commit 5e67e7d
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import static org.testng.Assert.assertNotNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -52,11 +57,12 @@ public void testLambda() throws Exception {
@Test
public void testAsyncFunction() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();
ExecutorService executor = Executors.newCachedThreadPool();

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
executor.submit(() -> {
try {
Thread.sleep(500);
result.complete(String.format("%s-lambda", input));
Expand All @@ -77,18 +83,20 @@ public void testAsyncFunction() throws Exception {
assertNotNull(result.get().getResult());
assertEquals(new String(testString + "-lambda"), result.get().getResult());
instance.close();
executor.shutdownNow();
}

@Test
public void testAsyncFunctionMaxPending() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();
int pendingQueueSize = 2;
int pendingQueueSize = 3;
instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
ExecutorService executor = Executors.newCachedThreadPool();

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
executor.submit(() -> {
try {
Thread.sleep(500);
result.complete(String.format("%s-lambda", input));
Expand Down Expand Up @@ -126,5 +134,6 @@ public void testAsyncFunctionMaxPending() throws Exception {

log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
executor.shutdownNow();
}
}

0 comments on commit 5e67e7d

Please sign in to comment.