Skip to content

Commit

Permalink
ZOOKEEPER-2183 Concurrent Testing Processes and Port Assignments (Chr…
Browse files Browse the repository at this point in the history
…is Nauroth via hdeng)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1679400 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Hongchao Deng committed May 14, 2015
1 parent 692bfe6 commit a358280
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ IMPROVEMENTS:

ZOOKEEPER-2171 avoid reverse lookups in QuorumCnxManager (rgs via michim)

ZOOKEEPER-2183 Concurrent Testing Processes and Port Assignments (Chris Nauroth via hdeng)

Release 3.5.0 - 8/4/2014

NEW FEATURES:
Expand Down
3 changes: 3 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
<property name="test.timeout" value="900000" />
<property name="test.junit.output.format" value="plain" />
<property name="test.junit.fork.mode" value="perTest" />
<property name="test.junit.threads" value="8" />
<property name="test.junit.printsummary" value="yes" />
<property name="test.junit.haltonfailure" value="no" />
<property name="config.dir" value="${src.dir}/java/test/config" />
Expand Down Expand Up @@ -1314,6 +1315,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
haltonfailure="${test.junit.haltonfailure}"
fork="yes"
forkmode="${test.junit.fork.mode}"
threads="${test.junit.threads}"
maxmemory="${test.junit.maxmem}"
dir="${test.java.build.dir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
Expand All @@ -1327,6 +1329,7 @@ xmlns:maven="antlib:org.apache.maven.artifact.ant">
with junit fork mode set to "once")-->
<sysproperty key="zookeeper.DigestAuthenticationProvider.superDigest"
value="super:D/InIHSb7yEEbrWz8b9l71RjZJU=" />
<sysproperty key="test.junit.threads" value="${test.junit.threads}" />
<classpath refid="test.java.classpath"/>
<classpath>
<pathelement path="${test.java.classes}" />
Expand Down
185 changes: 180 additions & 5 deletions src/java/test/org/apache/zookeeper/PortAssignment.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,193 @@

package org.apache.zookeeper;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Assign ports to tests */
public class PortAssignment {
public final class PortAssignment {
private static final Logger LOG = LoggerFactory.getLogger(PortAssignment.class);

private static int nextPort = 11221;
// The available port range that we use stays away from the ephemeral port
// range, which the OS will assign to client socket connections. We can't
// coordinate with the OS on the assignment of those ports, so it's best to
// stay out of that range to avoid conflicts. Typical ranges for ephemeral
// ports are:
// - IANA suggests 49152 - 65535
// - Linux typically uses 32768 - 61000
// - FreeBSD modern versions typically use the IANA suggested range
// - Windows modern versions typically use the IANA suggested range
private static final int GLOBAL_BASE_PORT = 11221;
private static final int GLOBAL_MAX_PORT = 32767;

private static PortRange portRange = null;
private static int nextPort;

/** Assign a new, unique port to the test */
/**
* Assign a new, unique port to the test. This method works by assigning
* ports from a valid port range as identified by the total number of
* concurrent test processes and the ID of this test process. Each
* concurrent test process uses an isolated range, so it's not possible for
* multiple test processes to collide on the same port. Within the port
* range, ports are assigned in monotonic increasing order, wrapping around
* to the beginning of the range if needed. As an extra precaution, the
* method attempts to bind to the port and immediately close it before
* returning it to the caller. If the port cannot be bound, then it tries
* the next one in the range. This provides some resiliency in case the port
* is otherwise occupied, such as a developer running other servers on the
* machine running the tests.
*
* @return port
*/
public synchronized static int unique() {
LOG.info("assigning port " + nextPort);
return nextPort++;
if (portRange == null) {
portRange = setupPortRange(System.getProperty("test.junit.threads"),
System.getProperty("sun.java.command"));
nextPort = portRange.getMinimum();
}
int candidatePort = nextPort;
for (;;) {
++candidatePort;
if (candidatePort > portRange.getMaximum()) {
candidatePort = portRange.getMinimum();
}
if (candidatePort == nextPort) {
throw new IllegalStateException(String.format(
"Could not assign port from range %s. The entire " +
"range has been exhausted.", portRange));
}
try {
ServerSocket s = new ServerSocket(candidatePort);
s.close();
nextPort = candidatePort;
LOG.info("Assigned port {} from range {}.", nextPort, portRange);
return nextPort;
} catch (IOException e) {
LOG.debug("Could not bind to port {} from range {}. " +
"Attempting next port.", candidatePort, portRange, e);
}
}
}

/**
* Sets up the port range to be used. In typical usage, Ant invokes JUnit,
* possibly using multiple JUnit processes to execute multiple test suites
* concurrently. The count of JUnit processes is passed from Ant as a system
* property named "test.junit.threads". Ant's JUnit runner receives the
* thread ID as a command line argument of the form threadid=N, where N is an
* integer in the range [1, ${test.junit.threads}]. It's not otherwise
* accessible, so we need to parse it from the command line. This method
* uses these 2 pieces of information to split the available ports into
* disjoint ranges. Each JUnit process only assigns ports from its own range
* in order to prevent bind errors during concurrent test runs. If any of
* this information is unavailable or unparseable, then the default behavior
* is for this process to use the entire available port range. This is
* expected when running tests outside of Ant.
*
* @param strProcessCount string representation of integer process count,
* typically taken from system property test.junit.threads
* @param cmdLine command line containing threadid=N argument, typically
* taken from system property sun.java.command
* @return port range to use
*/
static PortRange setupPortRange(String strProcessCount, String cmdLine) {
Integer processCount = null;
if (strProcessCount != null && !strProcessCount.isEmpty()) {
try {
processCount = Integer.valueOf(strProcessCount);
} catch (NumberFormatException e) {
LOG.warn("Error parsing test.junit.threads = {}.",
strProcessCount, e);
}
}

Integer threadId = null;
if (processCount != null) {
if (cmdLine != null && !cmdLine.isEmpty()) {
Matcher m = Pattern.compile("threadid=(\\d+)").matcher(cmdLine);
if (m.find()) {
try {
threadId = Integer.valueOf(m.group(1));
} catch (NumberFormatException e) {
LOG.warn("Error parsing threadid from {}.", cmdLine, e);
}
}
}
}

final PortRange newPortRange;
if (processCount != null && processCount > 1 && threadId != null) {
// We know the total JUnit process count and this test process's ID.
// Use these values to calculate the valid range for port assignments
// within this test process. We lose a few possible ports to the
// remainder, but that's acceptable.
int portRangeSize = (GLOBAL_MAX_PORT - GLOBAL_BASE_PORT) /
processCount;
int minPort = GLOBAL_BASE_PORT + ((threadId - 1) * portRangeSize);
int maxPort = minPort + portRangeSize - 1;
newPortRange = new PortRange(minPort, maxPort);
LOG.info("Test process {}/{} using ports from {}.", threadId,
processCount, newPortRange);
} else {
// If running outside the context of Ant or Ant is using a single
// test process, then use all valid ports.
newPortRange = new PortRange(GLOBAL_BASE_PORT, GLOBAL_MAX_PORT);
LOG.info("Single test process using ports from {}.", newPortRange);
}

return newPortRange;
}

/**
* Contains the minimum and maximum (both inclusive) in a range of ports.
*/
static final class PortRange {
private final int minimum;
private final int maximum;

/**
* Creates a new PortRange.
*
* @param minimum lower bound port number
* @param maximum upper bound port number
*/
PortRange(int minimum, int maximum) {
this.minimum = minimum;
this.maximum = maximum;
}

/**
* Returns maximum port in the range.
*
* @return maximum
*/
int getMaximum() {
return maximum;
}

/**
* Returns minimum port in the range.
*
* @return minimum
*/
int getMinimum() {
return minimum;
}

@Override
public String toString() {
return String.format("%d - %d", minimum, maximum);
}
}

/**
* There is no reason to instantiate this class.
*/
private PortAssignment() {
}
}
79 changes: 79 additions & 0 deletions src/java/test/org/apache/zookeeper/PortAssignmentTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collection;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.Test;

@RunWith(Parameterized.class)
public class PortAssignmentTest {

private final String strProcessCount;
private final String cmdLine;
private final int expectedMinimumPort;
private final int expectedMaximumPort;

@Parameters
public static Collection<Object[]> data() {
return Arrays.<Object[]>asList(
new Object[] { "8", "threadid=1", 11221, 13913 },
new Object[] { "8", "threadid=2", 13914, 16606 },
new Object[] { "8", "threadid=3", 16607, 19299 },
new Object[] { "8", "threadid=4", 19300, 21992 },
new Object[] { "8", "threadid=5", 21993, 24685 },
new Object[] { "8", "threadid=6", 24686, 27378 },
new Object[] { "8", "threadid=7", 27379, 30071 },
new Object[] { "8", "threadid=8", 30072, 32764 },
new Object[] { "1", "threadid=1", 11221, 32767 },
new Object[] { "2", "threadid=1", 11221, 21993 },
new Object[] { "2", "threadid=2", 21994, 32766 },
new Object[] { null, null, 11221, 32767 },
new Object[] { "", "", 11221, 32767 });
}

public PortAssignmentTest(String strProcessCount, String cmdLine,
int expectedMinimumPort, int expectedMaximumPort) {
this.strProcessCount = strProcessCount;
this.cmdLine = cmdLine;
this.expectedMinimumPort = expectedMinimumPort;
this.expectedMaximumPort = expectedMaximumPort;
}

@Test
public void testSetupPortRange() {
PortAssignment.PortRange portRange = PortAssignment.setupPortRange(
strProcessCount, cmdLine);
assertEquals(buildAssertionMessage("minimum"), expectedMinimumPort,
portRange.getMinimum());
assertEquals(buildAssertionMessage("maximum"), expectedMaximumPort,
portRange.getMaximum());
}

private String buildAssertionMessage(String checkType) {
return String.format("strProcessCount = %s, cmdLine = %s, checking %s",
strProcessCount, cmdLine, checkType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private ZooKeeper getClient(int idx) {
@Override
protected void setUp() throws Exception {
LOG.info("STARTING " + getName());
System.setProperty("zookeeper.admin.enableServer", "false");

// set the snap count to something low so that we force log rollover
// and verify that is working as part of the epoch rollover.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void setUp(int numCommitThreads, int numClientThreads)
System.setProperty(
CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
Integer.toString(numCommitThreads));
System.setProperty("zookeeper.admin.enableServer", "false");
tmpDir = ClientBase.createTmpDir();
ClientBase.setupTestEnv();
zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -82,6 +83,11 @@ public class WatchLeakTest {

private final boolean sessionTimedout;

@Before
public void setUp() {
System.setProperty("zookeeper.admin.enableServer", "false");
}

public WatchLeakTest(boolean sessionTimedout) {
this.sessionTimedout = sessionTimedout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
Expand All @@ -77,6 +78,11 @@ public class Zab1_0Test {
private static final File testData = new File(
System.getProperty("test.data.dir", "build/test/data"));

@Before
public void setUp() {
System.setProperty("zookeeper.admin.enableServer", "false");
}

private static final class LeadThread extends Thread {
private final Leader leader;

Expand Down
Loading

0 comments on commit a358280

Please sign in to comment.