Skip to content

Commit

Permalink
Merge pull request Alluxio#5205 from peisun1115/bugfix
Browse files Browse the repository at this point in the history
[ALLUXIO-2818] Fix MasterFaultToleranceIntegrationTest
  • Loading branch information
peisun1115 authored Apr 27, 2017
2 parents 5ea5c0f + 29261f0 commit 804be97
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class AlluxioMasterProcess implements MasterProcess {
private final int mPort;

/** The socket for thrift rpc server. */
private final TServerSocket mTServerSocket;
private TServerSocket mTServerSocket;

/** The transport provider to create thrift server transport. */
private final TransportProvider mTransportProvider;
Expand All @@ -75,16 +76,16 @@ public class AlluxioMasterProcess implements MasterProcess {
private final MetricsServlet mMetricsServlet = new MetricsServlet(MetricsSystem.METRIC_REGISTRY);

/** The master registry. */
private MasterRegistry mRegistry;
private final MasterRegistry mRegistry;

/** The web ui server. */
private WebServer mWebServer = null;
private WebServer mWebServer;

/** The RPC server. */
private TServer mThriftServer = null;
private TServer mThriftServer;

/** is true if the master is serving the RPC server. */
private boolean mIsServing = false;
private boolean mIsServing;

/** The start time for when the master started serving the RPC server. */
private long mStartTimeMs = -1;
Expand Down Expand Up @@ -118,10 +119,10 @@ public class AlluxioMasterProcess implements MasterProcess {
Preconditions.checkState(Configuration.getInt(PropertyKey.MASTER_WEB_PORT) > 0,
this + " web port is only allowed to be zero in test mode.");
}

mTransportProvider = TransportProvider.Factory.create();
mTServerSocket =
new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC),
connectionTimeout);
mTServerSocket = new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC),
Configuration.getInt(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS));
mPort = NetworkAddressUtils.getThriftPort(mTServerSocket);
// reset master rpc port
Configuration.set(PropertyKey.MASTER_RPC_PORT, Integer.toString(mPort));
Expand Down Expand Up @@ -193,6 +194,7 @@ public void stop() throws Exception {
stopServing();
stopMasters();
mTServerSocket.close();
mTServerSocket = null;
mIsServing = false;
}
}
Expand All @@ -208,7 +210,7 @@ protected void startMasters(boolean isLeader) {
connectToUFS();
mRegistry.start(isLeader);
} catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -288,9 +290,18 @@ protected void startServingRPCServer() {
try {
transportFactory = mTransportProvider.getServerTransportFactory();
} catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}

try {
if (mTServerSocket != null) {
mTServerSocket.close();
}
mTServerSocket = new TServerSocket(mRpcAddress,
Configuration.getInt(PropertyKey.MASTER_CONNECTION_TIMEOUT_MS));
} catch (TTransportException e) {
throw new RuntimeException(e);
}
// create master thrift service with the multiplexed processor.
Args args = new TThreadPoolServer.Args(mTServerSocket).maxWorkerThreads(mMaxWorkerThreads)
.minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory)
Expand Down Expand Up @@ -333,6 +344,6 @@ private void connectToUFS() throws IOException {

@Override
public String toString() {
return "Alluxio master";
return "Alluxio master @" + mRpcAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void heartbeat() {
blockReport.getRemovedBlocks(), blockReport.getAddedBlocks());
handleMasterCommand(cmdFromMaster);
mLastSuccessfulHeartbeatMs = System.currentTimeMillis();
} catch (Exception e) {
} catch (IOException | ConnectionFailedException e) {
// An error occurred, log and ignore it or error if heartbeat timeout is reached
if (cmdFromMaster == null) {
LOG.error("Failed to receive master heartbeat command.", e);
Expand All @@ -160,9 +160,11 @@ public void close() {
* This call will block until the command is complete.
*
* @param cmd the command to execute
* @throws IOException if I/O errors occur
* @throws ConnectionFailedException if connection fails
*/
// TODO(calvin): Evaluate the necessity of each command.
private void handleMasterCommand(Command cmd) throws Exception {
private void handleMasterCommand(Command cmd) throws IOException, ConnectionFailedException {
if (cmd == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import alluxio.heartbeat.HeartbeatThread;
import alluxio.metrics.MetricsSystem;
import alluxio.thrift.BlockWorkerClientService;
import alluxio.util.CommonUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.util.network.NetworkAddressUtils.ServiceType;
Expand All @@ -40,6 +41,7 @@
import alluxio.worker.file.FileSystemMasterClient;

import com.codahale.metrics.Gauge;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
Expand All @@ -51,6 +53,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -224,11 +227,27 @@ public void start(WorkerNetAddress address) {
*/
@Override
public void stop() {
// Steps to shutdown:
// 1. Gracefully shut down the runnables running in the executors.
// 2. Shutdown the executors.
// 3. Shutdown the clients. This needs to happen after the executors is shutdown because
// runnables running in the executors might be using the clients.
mSessionCleaner.stop();
// The executor shutdown needs to be done in a loop with retry because the interrupt
// signal can sometimes be ignored.
CommonUtils.waitFor("block worker executor shutdown", new Function<Void, Boolean>() {
@Override
public Boolean apply(Void input) {
getExecutorService().shutdownNow();
try {
return getExecutorService().awaitTermination(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
mBlockMasterClient.close();
mFileSystemMasterClient.close();
// Use shutdownNow because HeartbeatThreads never finish until they are interrupted
getExecutorService().shutdownNow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import alluxio.heartbeat.HeartbeatContext;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.thrift.FileSystemWorkerClientService;
import alluxio.util.CommonUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.util.network.NetworkAddressUtils.ServiceType;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.AbstractWorker;
import alluxio.worker.block.BlockWorker;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -35,6 +37,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -111,8 +114,19 @@ public void stop() {
if (mFilePersistenceService != null) {
mFilePersistenceService.cancel(true);
}
// The executor shutdown needs to be done in a loop with retry because the interrupt
// signal can sometimes be ignored.
CommonUtils.waitFor("file system worker executor shutdown", new Function<Void, Boolean>() {
@Override
public Boolean apply(Void input) {
getExecutorService().shutdownNow();
try {
return getExecutorService().awaitTermination(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
mFileSystemMasterWorkerClient.close();
// This needs to be shutdownNow because heartbeat threads will only stop when interrupted.
getExecutorService().shutdownNow();
}
}
16 changes: 13 additions & 3 deletions minicluster/src/main/java/alluxio/master/LocalAlluxioMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void start() {
@Override
public void run() {
try {
LOG.info("Starting Alluxio master {}.", mMasterProcess);
mMasterProcess.start();
} catch (Exception e) {
// Log the exception as the RuntimeException will be caught and handled silently by JUnit
Expand All @@ -108,6 +109,7 @@ public void run() {
};

mMasterThread = new Thread(runMaster);
mMasterThread.setName("MasterThread-" + System.identityHashCode(this));
mMasterThread.start();
mMasterProcess.waitForReady();
}
Expand All @@ -121,6 +123,7 @@ public void startSecondary() {
@Override
public void run() {
try {
LOG.info("Starting secondary master {}.", mSecondaryMaster);
mSecondaryMaster.start();
} catch (Exception e) {
// Log the exception as the RuntimeException will be caught and handled silently by JUnit
Expand All @@ -146,17 +149,24 @@ public boolean isServing() {
* Stops the master and cleans up client connections.
*/
public void stop() throws Exception {
clearClients();
// This shutdown needs to be done in a loop with retry because the interrupt signal can
// sometimes be ignored in the master implementation. For example, if the master is doing
// a hdfs listStatus RPC (hadoop version is 1.x), the interrupt signal is not properly handled.
while (mMasterThread.isAlive()) {
mMasterProcess.stop();
mMasterThread.interrupt();
LOG.info("Stopping master thread {}.", System.identityHashCode(this));
mMasterThread.join(1000);
}

mMasterProcess.stop();
mMasterThread.interrupt();
if (mSecondaryMaster != null) {
mSecondaryMaster.stop();
}
if (mSecondaryMasterThread != null) {
mSecondaryMasterThread.interrupt();
}

clearClients();
System.clearProperty("alluxio.web.resources");
System.clearProperty("alluxio.master.min.worker.threads");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ protected void startWorkers() throws Exception {
public void stopFS() throws Exception {
super.stopFS();
LOG.info("Stopping testing zookeeper: {}", mCuratorServer.getConnectString());
mCuratorServer.stop();
mCuratorServer.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,26 @@
import alluxio.client.file.options.DeleteOptions;
import alluxio.collections.Pair;
import alluxio.exception.AlluxioException;
import alluxio.hadoop.HadoopClientTestUtils;
import alluxio.master.block.BlockMaster;
import alluxio.thrift.CommandType;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.PathUtils;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Ignore("TODO(peis): Fix this. ALLUXIO-2818")
public class MasterFaultToleranceIntegrationTest {
// Fail if the cluster doesn't come up after this amount of time.
private static final int CLUSTER_WAIT_TIMEOUT_MS = 120 * Constants.SECOND_MS;
Expand All @@ -54,6 +54,14 @@ public class MasterFaultToleranceIntegrationTest {
private MultiMasterLocalAlluxioCluster mMultiMasterLocalAlluxioCluster = null;
private FileSystem mFileSystem = null;

@BeforeClass
public static void beforeClass() {
// Skip hadoop 1 because hadoop 1's RPC cannot be interrupted properly which makes it
// hard to shutdown a cluster.
// TODO(peis): Figure out a better way to support hadoop 1.
Assume.assumeFalse(HadoopClientTestUtils.isHadoop1x());
}

@After
public final void after() throws Exception {
mMultiMasterLocalAlluxioCluster.stop();
Expand Down Expand Up @@ -127,9 +135,8 @@ public Boolean apply(Void aVoid) {
try {
return store.getWorkerInfoList().size() >= numWorkers;
} catch (Exception e) {
Throwables.propagate(e);
return false;
}
return false;
}
}, WaitForOptions.defaults().setTimeout(timeoutMs));
}
Expand All @@ -146,6 +153,7 @@ public void createFileFault() throws Exception {
for (int kills = 0; kills < MASTERS - 1; kills++) {
Assert.assertTrue(mMultiMasterLocalAlluxioCluster.stopLeader());
mMultiMasterLocalAlluxioCluster.waitForNewMaster(CLUSTER_WAIT_TIMEOUT_MS);
waitForWorkerRegistration(AlluxioBlockStore.create(), 1, CLUSTER_WAIT_TIMEOUT_MS);
faultTestDataCheck(answer);
faultTestDataCreation(new AlluxioURI("/data_kills_" + kills), answer);
}
Expand All @@ -158,6 +166,7 @@ public void deleteFileFault() throws Exception {
for (int kills = 0; kills < MASTERS - 1; kills++) {
Assert.assertTrue(mMultiMasterLocalAlluxioCluster.stopLeader());
mMultiMasterLocalAlluxioCluster.waitForNewMaster(CLUSTER_WAIT_TIMEOUT_MS);
waitForWorkerRegistration(AlluxioBlockStore.create(), 1, CLUSTER_WAIT_TIMEOUT_MS);

if (kills % 2 != 0) {
// Delete files.
Expand Down Expand Up @@ -235,9 +244,10 @@ public void workerReRegister() throws Exception {
for (int kills = 0; kills < MASTERS - 1; kills++) {
Assert.assertTrue(mMultiMasterLocalAlluxioCluster.stopLeader());
mMultiMasterLocalAlluxioCluster.waitForNewMaster(CLUSTER_WAIT_TIMEOUT_MS);
waitForWorkerRegistration(store, 1, 5 * Constants.SECOND_MS);
waitForWorkerRegistration(store, 1, 1 * Constants.MINUTE_MS);
// If worker is successfully re-registered, the capacity bytes should not change.
Assert.assertEquals(WORKER_CAPACITY_BYTES, store.getCapacityBytes());
long capacityFound = store.getCapacityBytes();
Assert.assertEquals(WORKER_CAPACITY_BYTES, capacityFound);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
Expand All @@ -30,6 +32,7 @@
* cluster.
*/
public class LocalMiniDFSCluster extends UnderFileSystemCluster {
private static final Logger LOG = LoggerFactory.getLogger(LocalMiniDFSCluster.class);
/**
* Tests the local minidfscluster only.
*/
Expand Down Expand Up @@ -197,6 +200,7 @@ public boolean isStarted() {
*/
@Override
public void shutdown() throws IOException {
LOG.info("Shutting down DFS cluster.");
if (mIsStarted) {
mDfsClient.close();
mDfsCluster.shutdown();
Expand All @@ -209,6 +213,7 @@ public void shutdown() throws IOException {
*/
@Override
public void start() throws IOException {
LOG.info("Starting DFS cluster.");
if (!mIsStarted) {

delete(mBaseDir, true);
Expand Down

0 comments on commit 804be97

Please sign in to comment.