Skip to content

Commit

Permalink
GEODE-9713: Support thread count in ExecutorService rules (apache#7002)
Browse files Browse the repository at this point in the history
Restores thread count support to ExecutorServiceRule, and adds it to
DistributedExecutorServiceRule.

PROBLEM

ExecutorService rules currently create a newCachedThreadPool which
creates new threads as needed.

Some usages would benefit from the option of specifying a threadCount
limit which would create a newFixedThreadPool that reuses a fixed
number of threads.

SOLUTION

Add optional threadCount creation parameter to both ExecutorServiceRule
and DistributedExecutorServiceRule.

Creating a ExecutorService rule without a threadCount will still create a
newCachedThreadPool. Using a threadCount will now create a
newFixedThreadPool.
  • Loading branch information
kirklund authored Oct 19, 2021
1 parent 6a60434 commit 636bea3
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.dunit.rules.tests;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule.builder;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;

@SuppressWarnings("serial")
public class DistributedExecutorServiceRuleLimitedThreadCountTest implements Serializable {

private static final int THREAD_COUNT = 2;
private static final long TIMEOUT = getTimeout().toMinutes();
private static final TimeUnit UNIT = TimeUnit.MINUTES;
private static final AtomicInteger STARTED_TASKS = new AtomicInteger();
private static final AtomicInteger COMPLETED_TASKS = new AtomicInteger();
private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();

@Rule
public DistributedExecutorServiceRule executorServiceRule = builder()
.threadCount(THREAD_COUNT).vmCount(1).build();

@Before
public void setUp() {
Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
STARTED_TASKS.set(0);
COMPLETED_TASKS.set(0);
LATCH.set(new CountDownLatch(1));
}));
}

@Test
public void limitsRunningTasksToThreadCount() {
// start THREAD_COUNT threads to use up the executor's thread pool
Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
for (int i = 1; i <= THREAD_COUNT; i++) {
executorServiceRule.submit(() -> {
// increment count of started tasks and use a LATCH to keep it running
STARTED_TASKS.incrementAndGet();
assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
COMPLETED_TASKS.incrementAndGet();
});
}

// count of started tasks should be the same as THREAD_COUNT
await().untilAsserted(() -> {
assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
assertThat(COMPLETED_TASKS.get()).isZero();
});

// try to start one more task, but it should end up queued instead of started
executorServiceRule.submit(() -> {
STARTED_TASKS.incrementAndGet();
assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
COMPLETED_TASKS.incrementAndGet();
});

// started tasks should still be the same as THREAD_COUNT
assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
assertThat(COMPLETED_TASKS.get()).isZero();

// number of threads running in executor should also be the same as THREAD_COUNT
assertThat(executorServiceRule.getThreads()).hasSize(THREAD_COUNT);

// open latch to let started tasks complete, and queued task should also start and finish
LATCH.get().countDown();

// all tasks should eventually complete as the executor threads finish tasks
await().untilAsserted(() -> {
assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
assertThat(COMPLETED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
});
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.dunit.rules.tests;

import static org.apache.geode.test.dunit.VM.getVMCount;
import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;

public class DistributedExecutorServiceRuleLimitedVmCountTest {

private static final int VM_COUNT = 2;

@Rule
public DistributedExecutorServiceRule executorServiceRule =
new DistributedExecutorServiceRule(0, VM_COUNT);

@Test
public void limitsVmCount() {
assertThat(getVMCount()).isEqualTo(VM_COUNT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.dunit.rules.tests;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;

@SuppressWarnings("serial")
public class DistributedExecutorServiceRuleUnlimitedThreadCountTest implements Serializable {

private static final int PARALLEL_TASK_COUNT = 4;
private static final long TIMEOUT = getTimeout().toMinutes();
private static final TimeUnit UNIT = TimeUnit.MINUTES;
private static final AtomicBoolean COMPLETED = new AtomicBoolean();
private static final AtomicReference<CyclicBarrier> BARRIER = new AtomicReference<>();

@Rule
public DistributedExecutorServiceRule executorServiceRule =
new DistributedExecutorServiceRule(0, 1);

@Before
public void setUp() {
Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
COMPLETED.set(false);
BARRIER.set(new CyclicBarrier(PARALLEL_TASK_COUNT, () -> COMPLETED.set(true)));
}));
}

@Test
public void doesNotLimitThreadCount() {
Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
Collection<Future<Void>> tasks = new ArrayList<>();
for (int i = 1; i <= PARALLEL_TASK_COUNT; i++) {
tasks.add(executorServiceRule.submit(() -> {
BARRIER.get().await(TIMEOUT, UNIT);
}));
}
await().untilAsserted(() -> assertThat(COMPLETED.get()).isTrue());
for (Future<Void> task : tasks) {
assertThat(task).isDone();
}
}));
}
}
Loading

0 comments on commit 636bea3

Please sign in to comment.