Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Oct 20, 2017
1 parent 782b727 commit 65896c7
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 51 deletions.
Binary file removed .DS_Store
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.PropertyKey;
import alluxio.util.io.PathUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.Constants;
import alluxio.master.AlluxioMaster;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.Constants;
import alluxio.worker.AlluxioWorker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.PropertyKey;

Expand All @@ -28,21 +28,18 @@
public final class Master implements Closeable {
private final File mLogsDir;
private final File mConfDir;
private final File mOutFile;
private final MasterNetAddress mAddress;

private ExternalProcess mProcess;

/**
* @param mWorkDir the work directory to use for the master process
* @param address the address information for the master
* @param masterId an ID for this master, used to distinguish it from other masters in the same
* cluster
* @param confDir configuration directory
* @param logsDir logs directory
* @param address address information for the master
*/
public Master(File mWorkDir, int masterId, MasterNetAddress address) throws IOException {
mLogsDir = new File(mWorkDir, "logs-master" + masterId);
mConfDir = new File(mWorkDir, "conf");
mOutFile = new File(mLogsDir, "master.out");
public Master(File confDir, File logsDir, MasterNetAddress address) throws IOException {
mConfDir = confDir;
mLogsDir = logsDir;
mAddress = address;
}

Expand All @@ -58,7 +55,8 @@ public synchronized void start() throws IOException {
conf.put(PropertyKey.MASTER_HOSTNAME, mAddress.getHostname());
conf.put(PropertyKey.MASTER_RPC_PORT, mAddress.getRpcPort());
conf.put(PropertyKey.MASTER_WEB_PORT, mAddress.getWebPort());
mProcess = new ExternalProcess(conf, LimitedLifeMasterProcess.class, mOutFile);
mProcess =
new ExternalProcess(conf, LimitedLifeMasterProcess.class, new File(mLogsDir, "master.out"));
mProcess.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import com.google.common.base.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
Expand All @@ -19,6 +19,7 @@
import alluxio.PropertyKey;
import alluxio.cli.Format;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystem.Factory;
import alluxio.client.file.FileSystemContext;
import alluxio.exception.status.UnavailableException;
import alluxio.master.MasterInquireClient;
Expand Down Expand Up @@ -61,8 +62,8 @@
* The synchronization strategy for this class is to synchronize all public methods.
*/
@ThreadSafe
public final class ExternalCluster implements TestRule {
private static final Logger LOG = LoggerFactory.getLogger(ExternalCluster.class);
public final class MultiProcessCluster implements TestRule {
private static final Logger LOG = LoggerFactory.getLogger(MultiProcessCluster.class);
private static final File ARTIFACTS_DIR = new File("./target/artifacts");
private static final File TESTS_LOG = new File("./target/tests.log");

Expand All @@ -82,7 +83,7 @@ public final class ExternalCluster implements TestRule {
private State mState;
private TestingServer mCuratorServer;

private ExternalCluster(Map<PropertyKey, String> properties, int numMasters, int numWorkers,
private MultiProcessCluster(Map<PropertyKey, String> properties, int numMasters, int numWorkers,
String clusterName) {
mProperties = properties;
mNumMasters = numMasters;
Expand All @@ -109,11 +110,10 @@ public synchronized void start() throws Exception {
.register(new TestingServer(-1, AlluxioTestDirectory.createTemporaryDirectory("zk")));
mProperties.put(PropertyKey.ZOOKEEPER_ADDRESS, mCuratorServer.getConnectString());
} else {
mProperties.put(PropertyKey.MASTER_HOSTNAME, mMasterAddresses.get(0).getHostname());
mProperties.put(PropertyKey.MASTER_RPC_PORT,
Integer.toString(mMasterAddresses.get(0).getRpcPort()));
mProperties.put(PropertyKey.MASTER_WEB_PORT,
Integer.toString(mMasterAddresses.get(0).getWebPort()));
MasterNetAddress masterAddress = mMasterAddresses.get(0);
mProperties.put(PropertyKey.MASTER_HOSTNAME, masterAddress.getHostname());
mProperties.put(PropertyKey.MASTER_RPC_PORT, Integer.toString(masterAddress.getRpcPort()));
mProperties.put(PropertyKey.MASTER_WEB_PORT, Integer.toString(masterAddress.getWebPort()));
}

mWorkDir = AlluxioTestDirectory.createTemporaryDirectory(mClusterName);
Expand Down Expand Up @@ -188,7 +188,7 @@ public synchronized FileSystem getFileSystemClient() {
Preconditions.checkState(mState == State.STARTED,
"must be in the started state to get an fs client, but state was %s", mState);
MasterInquireClient inquireClient = getMasterInquireClient();
return FileSystem.Factory.get(FileSystemContext.create(null, inquireClient));
return Factory.get(mCloser.register(FileSystemContext.create(null, inquireClient)));
}

/**
Expand Down Expand Up @@ -235,7 +235,11 @@ public synchronized void destroy() throws IOException {
* @param i the index of the master to start
*/
public synchronized void startMaster(int i) throws IOException {
Master master = mCloser.register(new Master(mWorkDir, i, mMasterAddresses.get(i)));
Preconditions.checkState(mState == State.STARTED,
"Must be in a started state to start individual masters");
File confDir = new File(mWorkDir, "conf");
File logsDir = new File(mWorkDir, "logs-master" + i);
Master master = mCloser.register(new Master(confDir, logsDir, mMasterAddresses.get(i)));
mMasters.add(master);
master.start();
}
Expand All @@ -246,7 +250,12 @@ public synchronized void startMaster(int i) throws IOException {
* @param i the index of the worker to start
*/
public synchronized void startWorker(int i) throws IOException {
Worker worker = mCloser.register(new Worker(mWorkDir, i));
Preconditions.checkState(mState == State.STARTED,
"Must be in a started state to start individual workers");
File confDir = new File(mWorkDir, "conf");
File logsDir = new File(mWorkDir, "logs-worker" + i);
File ramdisk = new File(mWorkDir, "ramdisk" + i);
Worker worker = mCloser.register(new Worker(confDir, logsDir, ramdisk));
mWorkers.add(worker);
worker.start();
}
Expand Down Expand Up @@ -330,7 +339,7 @@ private enum State {
}

/**
* Builder for {@link ExternalCluster}.
* Builder for {@link MultiProcessCluster}.
*/
public static final class Builder {
private Map<PropertyKey, String> mProperties = new HashMap<>();
Expand Down Expand Up @@ -387,15 +396,15 @@ public Builder setClusterName(String clusterName) {
}

/**
* @return a constructed {@link ExternalCluster}
* @return a constructed {@link MultiProcessCluster}
*/
public ExternalCluster build() {
return new ExternalCluster(mProperties, mNumMasters, mNumWorkers, mClusterName);
public MultiProcessCluster build() {
return new MultiProcessCluster(mProperties, mNumMasters, mNumWorkers, mClusterName);
}
}

/**
* @return a new builder for an {@link ExternalCluster}
* @return a new builder for an {@link MultiProcessCluster}
*/
public static Builder newBuilder() {
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.util.CommonUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import alluxio.PropertyKey;
import alluxio.network.PortUtils;
Expand All @@ -35,23 +35,21 @@ public class Worker implements Closeable {
private final File mLogsDir;
private final File mConfDir;
private final File mRamdiskFile;
private final File mOutFile;
private final int mRpcPort;
private final int mDataPort;
private final int mWebPort;

private ExternalProcess mProcess;

/**
* @param mWorkDir the work directory to use for the worker process
* @param workerId an ID for this worker, used to distinguish it from other workers in the same
* cluster
* @param confDir the conf directory
* @param logsDir the work directory
* @param ramdisk the ramdisk
*/
public Worker(File mWorkDir, int workerId) throws IOException {
mLogsDir = new File(mWorkDir, "logs-worker" + workerId);
mConfDir = new File(mWorkDir, "conf");
mRamdiskFile = new File(mWorkDir, "ramdisk" + workerId);
mOutFile = new File(mLogsDir, "worker.out");
public Worker(File confDir, File logsDir, File ramdisk) throws IOException {
mConfDir = confDir;
mLogsDir = logsDir;
mRamdiskFile = ramdisk;
mRpcPort = PortUtils.getFreePort();
mDataPort = PortUtils.getFreePort();
mWebPort = PortUtils.getFreePort();
Expand All @@ -72,7 +70,8 @@ public synchronized void start() throws IOException {
conf.put(PropertyKey.WORKER_RPC_PORT, mRpcPort);
conf.put(PropertyKey.WORKER_DATA_PORT, mDataPort);
conf.put(PropertyKey.WORKER_WEB_PORT, mWebPort);
mProcess = new ExternalProcess(conf, LimitedLifeWorkerProcess.class, mOutFile);
mProcess =
new ExternalProcess(conf, LimitedLifeWorkerProcess.class, new File(mLogsDir, "worker.out"));
LOG.info("Starting worker with (rpc, data, web) ports ({}, {}, {})", mRpcPort, mDataPort,
mWebPort);
mProcess.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.external;
package alluxio.multi.process;

import static org.junit.Assert.assertEquals;

Expand All @@ -24,14 +24,14 @@
import org.junit.Test;
import org.junit.rules.Timeout;

public final class ExternalClusterTest {
public final class MultiProcessClusterTest {

@Rule
public Timeout mTimeout = Timeout.seconds(600);

@Test
public void simpleCluster() throws Exception {
ExternalCluster cluster = ExternalCluster.newBuilder()
MultiProcessCluster cluster = MultiProcessCluster.newBuilder()
.setNumMasters(1)
.setNumWorkers(1)
.build();
Expand All @@ -46,7 +46,7 @@ public void simpleCluster() throws Exception {

@Test
public void zookeeper() throws Exception {
ExternalCluster cluster = ExternalCluster.newBuilder()
MultiProcessCluster cluster = MultiProcessCluster.newBuilder()
.addProperty(PropertyKey.ZOOKEEPER_ENABLED, "true")
.setNumMasters(3)
.setNumWorkers(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import alluxio.client.WriteType;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.external.ExternalCluster;
import alluxio.multi.process.MultiProcessCluster;
import alluxio.master.file.FileSystemMaster;
import alluxio.master.file.options.ListStatusOptions;
import alluxio.underfs.UnderFileSystemConfiguration;
Expand Down Expand Up @@ -93,7 +93,7 @@ public final void before() throws Exception {

@Test
public void singleMasterJournalStopIntegration() throws Exception {
ExternalCluster cluster = ExternalCluster.newBuilder()
MultiProcessCluster cluster = MultiProcessCluster.newBuilder()
.setClusterName("singleMasterJournalStopIntegration")
.setNumWorkers(0)
.setNumMasters(1)
Expand Down Expand Up @@ -122,7 +122,7 @@ public void singleMasterJournalStopIntegration() throws Exception {
*/
@Test
public void multiMasterJournalStopIntegration() throws Exception {
ExternalCluster cluster = ExternalCluster.newBuilder()
MultiProcessCluster cluster = MultiProcessCluster.newBuilder()
.setClusterName("multiMasterJournalStopIntegration")
.setNumWorkers(0)
.setNumMasters(TEST_NUM_MASTERS)
Expand Down

0 comments on commit 65896c7

Please sign in to comment.