Skip to content

Commit

Permalink
[java client] Extract the mini cluster from BaseKuduTest
Browse files Browse the repository at this point in the history
BaseKuduTest does two things: it manages a mini Kudu cluster and it
offers some common utilities to unit tests.

Extracting the mini cluster out of it makes it easier to reuse. This
is particularly helpful for the incoming Spark module since Scala
and static methods don't play nicely together, which BaseKuduTest is
full of (like @BeforeClass).

Change-Id: Ic1b05168d77892e387e280d3569e87586b6ec395
Reviewed-on: http://gerrit.cloudera.org:8080/1365
Tested-by: Internal Jenkins
Reviewed-by: Dan Burkert <[email protected]>
  • Loading branch information
Jean-Daniel Cryans authored and Jean-Daniel Cryans committed Nov 13, 2015
1 parent 756d560 commit 354dcf3
Show file tree
Hide file tree
Showing 2 changed files with 357 additions and 224 deletions.
244 changes: 20 additions & 224 deletions java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,13 @@

import static org.junit.Assert.fail;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.kududb.ColumnSchema;
Expand All @@ -40,7 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
Expand All @@ -53,32 +44,23 @@ public class BaseKuduTest {

private static final int DEFAULT_MASTER_RPC_PORT = 7051;
private static final String START_CLUSTER = "startCluster";

private static final String NUM_MASTERS_PROP = "NUM_MASTERS";
private static final int NUM_TABLET_SERVERS = 3;
private static final int DEFAULT_NUM_MASTERS = 1;

// Number of masters that will be started for this test if we're starting
// a cluster.
private static final int NUM_MASTERS =
Integer.getInteger(NUM_MASTERS_PROP, DEFAULT_NUM_MASTERS);

// TS and Master ports will be assigned starting with this one.
private static final int PORT_START = 64030;
private static MiniKuduCluster miniCluster;

// Comma separate describing the master addresses and ports.
protected static String masterAddresses;
protected static List<HostAndPort> masterHostPorts;

protected static final int DEFAULT_SLEEP = 50000;
protected static final int NUM_TABLET_SERVERS = 3;
protected static final int DEFAULT_NUM_MASTERS = 1;

static final List<Thread> PROCESS_INPUT_PRINTERS = new ArrayList<>();

// Number of masters that will be started for this test if we're starting
// a cluster.
protected static final int NUM_MASTERS =
Integer.getInteger(NUM_MASTERS_PROP, DEFAULT_NUM_MASTERS);

// Map of ports to master servers.
static final Map<Integer, Process> MASTERS = new ConcurrentHashMap<>(NUM_MASTERS);

// Map of ports to tablet servers.
static final Map<Integer, Process> TABLET_SERVERS =
new ConcurrentHashMap<Integer, Process>(NUM_TABLET_SERVERS);

// We create both versions of the client for ease of use.
protected static AsyncKuduClient client;
Expand All @@ -88,40 +70,20 @@ public class BaseKuduTest {
protected static boolean startCluster;

private static List<String> tableNames = new ArrayList<>();
private static List<String> pathsToDelete = new ArrayList<>();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
LOG.info("Setting up before class...");
// The following props are set via kudu-client's pom.
String baseDirPath = TestUtils.getBaseDir();

startCluster = Boolean.parseBoolean(System.getProperty(START_CLUSTER, "true"));

if (startCluster) {
long now = System.currentTimeMillis();
LOG.info("Starting {} masters...", NUM_MASTERS);
int port = startMasters(PORT_START, NUM_MASTERS, baseDirPath);
LOG.info("Starting {} tablet servers...", NUM_TABLET_SERVERS);
for (int i = 0; i < NUM_TABLET_SERVERS; i++) {
port = TestUtils.findFreePort(port);
String dataDirPath = baseDirPath + "/ts-" + i + "-" + now;
String flagsPath = TestUtils.getFlagsPath();
String[] tsCmdLine = {
TestUtils.findBinary("kudu-tserver"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--tserver_master_addrs=" + masterAddresses,
"--rpc_bind_addresses=127.0.0.1:" + port};
TABLET_SERVERS.put(port, configureAndStartProcess(tsCmdLine));
port++;

if (flagsPath.startsWith(baseDirPath)) {
// We made a temporary copy of the flags; delete them later.
pathsToDelete.add(flagsPath);
}
pathsToDelete.add(dataDirPath);
}
miniCluster = new MiniKuduCluster.MiniKuduClusterBuilder()
.numMasters(NUM_MASTERS)
.numTservers(NUM_TABLET_SERVERS)
.build();
masterAddresses = miniCluster.getMasterAddresses();
masterHostPorts = miniCluster.getMasterHostPorts();
} else {
masterAddresses = TestUtils.getMasterAddresses();
masterHostPorts = NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_RPC_PORT);
Expand All @@ -131,73 +93,10 @@ public static void setUpBeforeClass() throws Exception {
syncClient = new KuduClient(client);
LOG.info("Waiting for tablet servers...");
if (!waitForTabletServers(NUM_TABLET_SERVERS)) {
fail("Couldn't get " + NUM_MASTERS + " tablet servers running, aborting");
fail("Couldn't get " + NUM_TABLET_SERVERS + " tablet servers running, aborting");
}
}

/**
* Start the specified number of master servers with ports starting from a specified
* number. Finds free web and RPC ports up front for all of the masters first, then
* starts them on those ports, populating 'masters' map.
* @param masterStartPort The starting of the port range for the masters.
* @param numMasters Number of masters to start.
* @param baseDirPath Kudu base directory.
* @return Next free port.
* @throws Exception If we are unable to start the masters.
*/
static int startMasters(int masterStartPort, int numMasters,
String baseDirPath) throws Exception {
LOG.info("Starting {} masters...", numMasters);
// Get the list of web and RPC ports to use for the master consensus configuration:
// request NUM_MASTERS * 2 free ports as we want to also reserve the web
// ports for the consensus configuration.
List<Integer> ports = TestUtils.findFreePorts(masterStartPort, numMasters * 2);
int lastFreePort = ports.get(ports.size() - 1);
List<Integer> masterRpcPorts = Lists.newArrayListWithCapacity(numMasters);
List<Integer> masterWebPorts = Lists.newArrayListWithCapacity(numMasters);
masterHostPorts = Lists.newArrayListWithCapacity(numMasters);
for (int i = 0; i < numMasters * 2; i++) {
if (i % 2 == 0) {
masterRpcPorts.add(ports.get(i));
masterHostPorts.add(HostAndPort.fromParts("127.0.0.1", ports.get(i)));
} else {
masterWebPorts.add(ports.get(i));
}
}
masterAddresses = NetUtil.hostsAndPortsToString(masterHostPorts);
for (int i = 0; i < numMasters; i++) {
long now = System.currentTimeMillis();
String dataDirPath = baseDirPath + "/master-" + i + "-" + now;
String flagsPath = TestUtils.getFlagsPath();
// The web port must be reserved in the call to findFreePorts above and specified
// to avoid the scenario where:
// 1) findFreePorts finds RPC ports a, b, c for the 3 masters.
// 2) start master 1 with RPC port and let it bind to any (specified as 0) web port.
// 3) master 1 happens to bind to port b for the web port, as master 2 hasn't been
// started yet and findFreePort(s) is "check-time-of-use" (it does not reserve the
// ports, only checks that when it was last called, these ports could be used).
List<String> masterCmdLine = Lists.newArrayList(
TestUtils.findBinary("kudu-master"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--rpc_bind_addresses=127.0.0.1:" + masterRpcPorts.get(i),
"--webserver_port=" + masterWebPorts.get(i));
if (numMasters > 1) {
masterCmdLine.add("--master_addresses=" + masterAddresses);
}
MASTERS.put(masterRpcPorts.get(i),
configureAndStartProcess(masterCmdLine.toArray(new String[masterCmdLine.size()])));

if (flagsPath.startsWith(baseDirPath)) {
// We made a temporary copy of the flags; delete them later.
pathsToDelete.add(flagsPath);
}
pathsToDelete.add(dataDirPath);
}
return lastFreePort + 1;
}

/**
* Wait up to DEFAULT_SLEEP for an expected count of TS to connect to the master
* @param expected How many TS are expected
Expand All @@ -215,40 +114,6 @@ static boolean waitForTabletServers(int expected) throws Exception {
return count >= expected;
}

/**
* Starts a process using the provided command and configures it to be daemon,
* redirects the stderr to stdout, and starts a thread that will read from the process' input
* stream and redirect that to LOG.
* @param command Process and options
* @return The started process
* @throws Exception Exception if an error prevents us from starting the process,
* or if we were able to start the process but noticed that it was then killed (in which case
* we'll log the exit value).
*/
static Process configureAndStartProcess(String[] command) throws Exception {
LOG.info("Starting process: {}", Joiner.on(" ").join(command));
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true);
Process proc = processBuilder.start();
ProcessInputStreamLogPrinterRunnable printer =
new ProcessInputStreamLogPrinterRunnable(proc.getInputStream());
Thread thread = new Thread(printer);
thread.setDaemon(true);
thread.setName(command[0]);
PROCESS_INPUT_PRINTERS.add(thread);
thread.start();

Thread.sleep(300);
try {
int ev = proc.exitValue();
throw new Exception("We tried starting a process (" + command[0] + ") but it exited with " +
"value=" + ev);
} catch (IllegalThreadStateException ex) {
// This means the process is still alive, it's like reverse psychology.
}
return proc;
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
try {
Expand All @@ -261,29 +126,7 @@ public static void tearDownAfterClass() throws Exception {
}
} finally {
if (startCluster) {
for (Iterator<Process> masterIter = MASTERS.values().iterator(); masterIter.hasNext(); ) {
masterIter.next().destroy();
masterIter.remove();
}
for (Iterator<Process> tsIter = TABLET_SERVERS.values().iterator(); tsIter.hasNext(); ) {
tsIter.next().destroy();
tsIter.remove();
}
for (Thread thread : PROCESS_INPUT_PRINTERS) {
thread.interrupt();
}
}
for (String path : pathsToDelete) {
try {
File f = new File(path);
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
} else {
f.delete();
}
} catch (Exception e) {
LOG.warn("Could not delete path {}", path, e);
}
miniCluster.shutdown();
}
}
}
Expand Down Expand Up @@ -489,15 +332,7 @@ protected static void killTabletLeader(KuduTable table) throws Exception {
}

Integer port = leader.getRpcPort();
Process ts = TABLET_SERVERS.get(port);
if (ts == null) {
// The TS is already dead, good.
return;
}
LOG.info("Killing server at port " + port);
ts.destroy();
ts.waitFor();
TABLET_SERVERS.remove(port);
miniCluster.killTabletServerOnPort(port);
}

/**
Expand All @@ -508,15 +343,7 @@ protected static void killTabletLeader(KuduTable table) throws Exception {
*/
protected static void killMasterLeader() throws Exception {
int leaderPort = findLeaderMasterPort();
Process master = MASTERS.get(leaderPort);
if (master == null) {
// The master is already dead, good.
return;
}
LOG.info("Killing master at port " + leaderPort);
master.destroy();
master.waitFor();
MASTERS.remove(leaderPort);
miniCluster.killMasterOnPort(leaderPort);
}

/**
Expand Down Expand Up @@ -550,35 +377,4 @@ protected static int findLeaderMasterPort() throws Exception {
protected static String getMasterAddresses() {
return masterAddresses;
}

/**
* Helper runnable that can log what the processes are sending on their stdout and stderr that
* we'd otherwise miss.
*/
static class ProcessInputStreamLogPrinterRunnable implements Runnable {

private final InputStream is;

public ProcessInputStreamLogPrinterRunnable(InputStream is) {
this.is = is;
}

@Override
public void run() {
try {
String line;
BufferedReader in = new BufferedReader(new InputStreamReader(is));
while ((line = in.readLine()) != null) {
LOG.info(line);
}
in.close();
}
catch (Exception e) {
if (!e.getMessage().contains("Stream closed")) {
LOG.error("Caught error while reading a process' output", e);
}
}
}
}

}
Loading

0 comments on commit 354dcf3

Please sign in to comment.