Skip to content

Commit

Permalink
Multiple unit tests improvements (apache#5439)
Browse files Browse the repository at this point in the history
* Multiple unit tests improvements

* Fixed Storm integration tests

* Fixed StringSchema static initialization

* Peg number of forks to number of cores

* Updated to 4 forks

* Fixed resourcel leak in managed ledger tests

* Use different temp folders for PulsarFunctionState test

* Increase retries count for PulsarFunctionE2ESecurityTest

* Fixed Flume connector test

* Fixed race conditions in primitive schema types static initialization

* Improve port manager

* Removed PulsarFunctionStateTest to move to integration tests

* Fixed dangling class reference
  • Loading branch information
merlimat authored Oct 24, 2019
1 parent 978efaf commit 991c2a2
Show file tree
Hide file tree
Showing 100 changed files with 1,099 additions and 1,051 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ public class PulsarTestListener implements ITestListener {

@Override
public void onTestStart(ITestResult result) {
System.out.format("------- Starting test %s.%s(%s)-------\n", result.getTestClass(), result.getTestName(),
Arrays.toString(result.getParameters()));
System.out.format("------- Starting test %s.%s(%s)-------\n", result.getTestClass(),
result.getMethod().getMethodName(), Arrays.toString(result.getParameters()));
}

@Override
public void onTestSuccess(ITestResult result) {
System.out.format("------- SUCCESS -- %s.%s(%s)-------", result.getTestClass(), result.getTestName(),
Arrays.toString(result.getParameters()));
System.out.format("------- SUCCESS -- %s.%s(%s)-------", result.getTestClass(),
result.getMethod().getMethodName(), Arrays.toString(result.getParameters()));
}

@Override
public void onTestFailure(ITestResult result) {
System.out.format("!!!!!!!!! FAILURE-- %s.%s(%s)-------\n", result.getTestClass(), result.getTestName(),
Arrays.toString(result.getParameters()));
System.out.format("!!!!!!!!! FAILURE-- %s.%s(%s)-------\n", result.getTestClass(),
result.getMethod().getMethodName(), Arrays.toString(result.getParameters()));

if (result.getThrowable() instanceof ThreadTimeoutException) {
System.out.println("====== THREAD DUMPS ======");
Expand All @@ -52,8 +52,8 @@ public void onTestFailure(ITestResult result) {

@Override
public void onTestSkipped(ITestResult result) {
System.out.format("~~~~~~~~~ SKIPPED -- %s.%s(%s)-------", result.getTestClass(), result.getTestName(),
Arrays.toString(result.getParameters()));
System.out.format("~~~~~~~~~ SKIPPED -- %s.%s(%s)-------", result.getTestClass(),
result.getMethod().getMethodName(), Arrays.toString(result.getParameters()));
}

@Override
Expand All @@ -68,6 +68,5 @@ public void onStart(ITestContext context) {

@Override
public void onFinish(ITestContext context) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolic
throws Exception {
this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config);
}

private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
ManagedLedgerFactoryConfig config) throws Exception {
scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
Expand Down Expand Up @@ -179,7 +179,7 @@ public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
return bkClient;
}
}

private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
Expand Down Expand Up @@ -446,8 +446,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}
}

scheduledExecutor.shutdown();
orderedExecutor.shutdown();
scheduledExecutor.shutdownNow();
orderedExecutor.shutdownNow();
cacheEvictionExecutor.shutdownNow();

entryCacheManager.clear();
Expand Down Expand Up @@ -622,21 +622,21 @@ public static interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
default BookKeeper get() {
return get(null);
}

/**
* Returns Bk-Client for a given ensemblePlacementPolicyMetadata. It returns default bK-client if
* ensemblePlacementPolicyMetadata is null.
*
*
* @param ensemblePlacementPolicyMetadata
* @return
*/
BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
}

public static class EnsemblePlacementPolicyConfig {
private final Class<? extends EnsemblePlacementPolicy> policyClass;
private final Map<String, Object> properties;

public EnsemblePlacementPolicyConfig(Class<? extends EnsemblePlacementPolicy> policyClass,
Map<String, Object> properties) {
super();
Expand All @@ -651,7 +651,7 @@ public Class<? extends EnsemblePlacementPolicy> getPolicyClass() {
public Map<String, Object> getProperties() {
return properties;
}

@Override
public int hashCode() {
return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties);
Expand All @@ -668,6 +668,6 @@ public boolean equals(Object obj) {
return false;
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ public void testBookieFailure() throws Exception {
// ok
}

bkc.close();
bkc = new BookKeeperTestClient(baseClientConf);
startNewBookie();

// Reconnect a new bk client
factory.shutdown();
factory = new ManagedLedgerFactoryImpl(bkc, zkc);
ledger = factory.open("my-ledger", config);
cursor = ledger.openCursor("my-cursor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ protected void startBKCluster() throws Exception {
baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
baseClientConf.setUseV2WireProtocol(true);
baseClientConf.setEnableDigestTypeAutodetection(true);
baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
if (numBookies > 0) {
bkc = new BookKeeperTestClient(baseClientConf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@
*/
package org.apache.bookkeeper.test;

import java.io.FileReader;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;

import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.CharBuffer;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import lombok.Cleanup;

import org.jboss.netty.util.internal.ByteBufferUtil;

/**
* Port manager allows a base port to be specified on the commandline. Tests will then use ports, counting up from this
* base port. This allows multiple instances of the bookkeeper tests to run at once.
Expand All @@ -52,36 +60,37 @@ public synchronized static int nextFreePort() {
Path path = Paths.get(lockFilename);

try {
FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
FileLock lock = fileChannel.lock();

try {
@Cleanup
FileChannel fileChannel = FileChannel.open(path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.READ);

FileReader reader = new FileReader(lockFilename);
CharBuffer buffer = CharBuffer.allocate(16);
int len = reader.read(buffer);
buffer.flip();

int lastUsedPort = basePort;
if (len > 0) {
String lastUsedPortStr = buffer.toString();
lastUsedPort = Integer.parseInt(lastUsedPortStr);
}
@Cleanup
FileLock lock = fileChannel.lock();

int freePort = probeFreePort(lastUsedPort + 1);
ByteBuffer buffer = ByteBuffer.allocate(32);
int len = fileChannel.read(buffer, 0L);
buffer.flip();

FileWriter writer = new FileWriter(lockFilename);
writer.write(Integer.toString(freePort));
int lastUsedPort = basePort;
if (len > 0) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String lastUsedPortStr = new String(bytes);
lastUsedPort = Integer.parseInt(lastUsedPortStr);
}

reader.close();
writer.close();
int freePort = probeFreePort(lastUsedPort + 1);

return freePort;
buffer.clear();
buffer.put(Integer.toString(freePort).getBytes());
buffer.flip();
fileChannel.write(buffer, 0L);
fileChannel.truncate(buffer.position());
fileChannel.force(true);

} finally {
lock.release();
fileChannel.close();
}
return freePort;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -97,21 +106,29 @@ private synchronized static int probeFreePort(int port) {
port = basePort;
}

try (ServerSocket ss = new ServerSocket()) {
ss.setReuseAddress(false);
ss.bind(new InetSocketAddress(InetAddress.getByName("localhost"), port), 1);
ss.close();
// Give it some time to truly close the connection
Thread.sleep(100);
return port;
try (Socket s = new Socket()) {
s.connect(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100);

// If we succeed to connect it means the port is being used

} catch (ConnectException e) {
return port;
} catch (Exception e) {
port++;
exceptionCount++;
if (exceptionCount > MAX_PORT_CONFLICTS) {
throw new RuntimeException(e);
}
e.printStackTrace();
}

port++;
exceptionCount++;
if (exceptionCount > MAX_PORT_CONFLICTS) {
throw new RuntimeException("Failed to find an open port");
}
}
}

public static void main(String[] args) throws Exception {
while (true) {
System.out.println("Port: " + nextFreePort());
Thread.sleep(100);
}
}
}
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1139,14 +1139,15 @@ flexible messaging model and an intuitive client API.</description>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine> -Xmx2G
<argLine> -Xmx1G -XX:+UseG1GC
-Dpulsar.allocator.pooled=false
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
-Dlog4j.configurationFile=log4j2.xml
</argLine>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<forkCount>4</forkCount>
<shutdown>kill</shutdown>
<redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
<trimStackTrace>false</trimStackTrace>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.util.DirectMemoryUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -277,6 +278,12 @@ public void start() throws Exception {
public void join() throws InterruptedException {
pulsarService.waitUntilClosed();

try {
pulsarService.close();
} catch (PulsarServerException e) {
throw new RuntimeException();
}

if (bookieServer != null) {
bookieServer.join();
}
Expand Down Expand Up @@ -333,9 +340,9 @@ public static void main(String[] args) throws Exception {
} catch (Exception e) {
log.error("Failed to start pulsar service.", e);
Runtime.getRuntime().halt(1);
} finally {
starter.join();
}

starter.join();
}

private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStarter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;

@Slf4j
public class NoOpShutdownService implements ShutdownService {

@Override
public void run() {
shutdown(0);
}

@Override
public void shutdown(int exitCode) {
log.warn("Invoked shutdown with exitCode={}", exitCode);
}

}
Loading

0 comments on commit 991c2a2

Please sign in to comment.