Skip to content

Commit

Permalink
GEODE-3702 Adding a new framework for concurrency testing in geode
Browse files Browse the repository at this point in the history
Added a new junit runner, ConcurrentTestRunner, for running a test that
has parallel threads.

The runner currently runs the test using Java PathFinder, which will
run the test with all interleavings of the threads.

Added an example test to geode-core for the FilterProfile serialization
logic.

There are options to configure which concurrent test runner to use, and
configuration for the JPF runner to pass jpf properties.
  • Loading branch information
upthewaterspout committed Sep 25, 2017
1 parent 94dcc62 commit 13f3c39
Show file tree
Hide file tree
Showing 25 changed files with 3,043 additions and 7 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ buildscript {
classpath "com.diffplug.gradle.spotless:spotless:2.2.0"
classpath "me.champeau.gradle:jmh-gradle-plugin:0.3.1"
classpath "com.pedjak.gradle.plugins:dockerized-test:0.5.4"
classpath "gradle.plugin.com.github.upthewaterspout.jpfgradle:jpfgradle:0.3"
}
}

Expand Down
28 changes: 28 additions & 0 deletions geode-concurrency-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply plugin: "com.github.upthewaterspout.jpf"

jpf {
sourceSet='main'
downloadUrl='https://bitbucket.org/upthewaterspout/jpf-core/downloads/jpf-core-r34.zip'
installDir=System.getProperty('user.home') + '/.jpf'
}

dependencies {
compile 'junit:junit:' + project.'junit.version'
compile 'org.apache.logging.log4j:log4j-api:' + project.'log4j.version'
}
156 changes: 156 additions & 0 deletions geode-concurrency-test/src/main/java/java/net/InetAddress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package java.net;

import java.io.IOException;
import java.nio.charset.Charset;

/**
* Replace the JDK's InetAddress with a mock implementation that uses no native calls or IO.
*
* This implementation is simply a holder for a hostname, no DNS lookups are done. The address
* returned from getAddress is simply the hostname converted to bytes in UTF-8
*/
public class InetAddress implements java.io.Serializable {

public String hostname;

public InetAddress(String host) {
this.hostname = host;
}

public boolean isMulticastAddress() {
return false;
}

public boolean isAnyLocalAddress() {
return false;
}

public boolean isLoopbackAddress() {
return false;
}

public boolean isLinkLocalAddress() {
return false;
}

public boolean isSiteLocalAddress() {
return false;
}

public boolean isMCGlobal() {
return false;
}

public boolean isMCNodeLocal() {
return false;
}

public boolean isMCLinkLocal() {
return false;
}

public boolean isMCSiteLocal() {
return false;
}

public boolean isMCOrgLocal() {
return false;
}


public boolean isReachable(int timeout) throws IOException {
return false;
}

public boolean isReachable(NetworkInterface netif, int ttl, int timeout) throws IOException {
return false;
}

public String getHostName() {
return hostname;
}

public String getCanonicalHostName() {
return hostname;
}


public byte[] getAddress() {
return hostname.getBytes(Charset.forName("UTF-8"));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

InetAddress that = (InetAddress) o;

return hostname != null ? hostname.equals(that.hostname) : that.hostname == null;
}

@Override
public int hashCode() {
return hostname != null ? hostname.hashCode() : 0;
}

/**
* Converts this IP address to a {@code String}. The string returned is of the form: hostname /
* literal IP address.
*
* If the host name is unresolved, no reverse name service lookup is performed. The hostname part
* will be represented by an empty string.
*
* @return a string representation of this IP address.
*/
public String toString() {
return hostname;
}


public static InetAddress getByAddress(String host, byte[] addr) throws UnknownHostException {
return new InetAddress(host);
}


public static InetAddress getByName(String host) throws UnknownHostException {
return new InetAddress(host);
}

public static InetAddress[] getAllByName(String host) throws UnknownHostException {
return new InetAddress[] {new InetAddress("localhost")};
}

public static InetAddress getLoopbackAddress() {
return new InetAddress("localhost");
}


public static InetAddress getByAddress(byte[] addr) throws UnknownHostException {
String host = new String(addr, Charset.forName("UTF-8"));
return getByName(host);
}

public static InetAddress getLocalHost() throws UnknownHostException {
return getLoopbackAddress();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.concurrency;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;

import gov.nasa.jpf.Config;
import gov.nasa.jpf.JPF;
import gov.nasa.jpf.JPFListener;
import junit.framework.AssertionFailedError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.simple.SimpleLoggerContextFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import org.junit.runner.notification.Failure;
import org.junit.runner.notification.RunNotifier;
import org.junit.runners.ParentRunner;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.InitializationError;

import org.apache.geode.test.concurrency.annotation.ConcurrentTestConfig;
import org.apache.geode.test.concurrency.jpf.JpfRunner;


/**
* Test runner for tests that involve multiple threads.
*
* All methods annotated with @Test must take a {@link ParallelExecutor} as a parameter, which the
* test can use to invoke code in parallel.
*
* This test run will try to exercise the test method to flush out any concurrent bugs in the
* parallel execution. Currently this runner is using Java PathFinder to run the test with *all*
* possible thread interleavings, but other methods such as invoking the method multiple times in a
* normal JVM may be supported in the feature.
*
* All test logic and state *must* be encapsulated in the individual test methods. This is because
* the concurrency testing logic may need to invoke the test body multiple times, possibly in
* parallel.
*
* No guarantees are currently made about logic in methods annotated with @Before, @BeforeClass or
* about the behavior of {@link Rule} for concurrent tests.
*
* Example
*
* <code>
* &#64;RunWith(ConcurrentTestRunner.class)
* public void MyTest {
* &#64;Test
* public void someTestMethod(ParallelExecutor executor) {
* AtomicInteger atomicInteger = new AtomicInteger();
* executor.inParallel(() -> atomicInteger.incrementAndGet());
* executor.inParallel(() -> atomicInteger.incrementAndGet());
* executor.execute();
* assertEquals(2, atomicInteger.get());
* }
* }
* </code>
*
* ConcurrentTestRunner currently executes tests using Java Pathfinder, which will run the test with
* all thread interleavings.
*/
public class ConcurrentTestRunner extends ParentRunner<FrameworkMethod> {
/**
* Delegate to actually run the test.
*/
private final Runner runner;

public ConcurrentTestRunner(Class testClass) throws InitializationError {
super(testClass);
ConcurrentTestConfig configuration = getTestClass().getAnnotation(ConcurrentTestConfig.class);
if (configuration == null) {
runner = new JpfRunner();
return;
}

try {
runner = configuration.runner().newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new InitializationError(e);
}
}

@Override
protected List<FrameworkMethod> getChildren() {
return getTestClass().getAnnotatedMethods(Test.class);
}

@Override
protected void collectInitializationErrors(List<Throwable> errors) {
super.collectInitializationErrors(errors);
validateTestMethods(getChildren(), errors);
}

private void validateTestMethods(List<FrameworkMethod> methods, List<Throwable> errors) {
for (FrameworkMethod method : methods) {
if (!Arrays.equals(method.getMethod().getParameterTypes(),
new Class[] {ParallelExecutor.class})) {
errors.add(new AssertionFailedError("Incorrect signature on method: " + method
+ ". For a concurrent test, all test methods should take a ParallelExector parameter."));
}
}
}

@Override
protected Description describeChild(FrameworkMethod child) {
return Description.createTestDescription(child.getDeclaringClass(), child.getName());
}

@Override
protected void runChild(FrameworkMethod child, RunNotifier notifier) {
notifier.fireTestStarted(describeChild(child));
try {
List<Throwable> failures = runner.runTestMethod(child.getMethod());
failures.stream()
.forEach(failure -> notifier.fireTestFailure(new Failure(describeChild(child), failure)));
} finally {
notifier.fireTestFinished(describeChild(child));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.concurrency;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* Executor that executes multiple tasks in parallel as part of the main body of concurrent test.
*
* See {@link ConcurrentTestRunner}
*/
public interface ParallelExecutor {

/**
* Add a task to run in parallel
*/
<T> Future<T> inParallel(Callable<T> callable);

/**
* Add a task to run in parallel
*/
default <T> Future<T> inParallel(RunnableWithException runnable) {
return inParallel(() -> {
runnable.run();
return null;
});
}

/**
* Execute all tasks in parallel, wait for them to complete and throw an exception if any of the
* tasks throw an exception.
*/
void execute() throws ExecutionException, InterruptedException;
}
Loading

0 comments on commit 13f3c39

Please sign in to comment.