Skip to content

Commit 9e96ac6

Browse files
committed
HADOOP-10219. ipc.Client.setupIOstreams() needs to check for ClientCache.stopClient requested shutdowns.
Contributed by Kihwal Lee and Lukas Majercak.
1 parent 6e5ffb7 commit 9e96ac6

File tree

2 files changed

+59
-0
lines changed
  • hadoop-common-project/hadoop-common/src

2 files changed

+59
-0
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

+14
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.concurrent.atomic.AtomicBoolean;
7171
import java.util.concurrent.atomic.AtomicInteger;
7272
import java.util.concurrent.atomic.AtomicLong;
73+
import java.util.concurrent.atomic.AtomicReference;
7374

7475
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
7576
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@@ -439,6 +440,8 @@ private class Connection extends Thread {
439440

440441
private final Object sendRpcRequestLock = new Object();
441442

443+
private AtomicReference<Thread> connectingThread = new AtomicReference<>();
444+
442445
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
443446
this.remoteId = remoteId;
444447
this.server = remoteId.getAddress();
@@ -777,6 +780,7 @@ private synchronized void setupIOstreams(
777780
}
778781
}
779782
try {
783+
connectingThread.set(Thread.currentThread());
780784
if (LOG.isDebugEnabled()) {
781785
LOG.debug("Connecting to "+server);
782786
}
@@ -862,6 +866,8 @@ public AuthMethod run()
862866
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
863867
}
864868
close();
869+
} finally {
870+
connectingThread.set(null);
865871
}
866872
}
867873

@@ -1215,6 +1221,13 @@ private synchronized void markClosed(IOException e) {
12151221
notifyAll();
12161222
}
12171223
}
1224+
1225+
private void interruptConnectingThread() {
1226+
Thread connThread = connectingThread.get();
1227+
if (connThread != null) {
1228+
connThread.interrupt();
1229+
}
1230+
}
12181231

12191232
/** Close the connection. */
12201233
private synchronized void close() {
@@ -1321,6 +1334,7 @@ public void stop() {
13211334
// wake up all connections
13221335
for (Connection conn : connections.values()) {
13231336
conn.interrupt();
1337+
conn.interruptConnectingThread();
13241338
}
13251339

13261340
// wait until all connections are closed

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

+45
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.junit.Assert.assertTrue;
2525
import static org.junit.Assert.fail;
2626
import static org.mockito.Matchers.anyInt;
27+
import static org.mockito.Mockito.doAnswer;
2728
import static org.mockito.Mockito.doThrow;
2829
import static org.mockito.Mockito.mock;
2930
import static org.mockito.Mockito.spy;
@@ -1398,6 +1399,50 @@ public void testClientGetTimeout() throws IOException {
13981399
assertEquals(Client.getTimeout(config), -1);
13991400
}
14001401

1402+
@Test(timeout=60000)
1403+
public void testSetupConnectionShouldNotBlockShutdown() throws Exception {
1404+
// Start server
1405+
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
1406+
Server server = new TestServer(1, true);
1407+
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
1408+
1409+
// Track how many times we retried to set up the connection
1410+
final AtomicInteger createSocketCalled = new AtomicInteger();
1411+
1412+
doAnswer(new Answer() {
1413+
@Override
1414+
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
1415+
createSocketCalled.addAndGet(1);
1416+
Thread.sleep(MIN_SLEEP_TIME * 5);
1417+
throw new ConnectTimeoutException("fake");
1418+
}
1419+
}).when(mockFactory).createSocket();
1420+
final Client client = new Client(LongWritable.class, conf, mockFactory);
1421+
1422+
final AtomicBoolean callStarted = new AtomicBoolean(false);
1423+
1424+
// Call a random function asynchronously so that we can call stop()
1425+
new Thread(new Runnable() {
1426+
public void run() {
1427+
try {
1428+
callStarted.set(true);
1429+
call(client, RANDOM.nextLong(), addr, conf);
1430+
} catch (IOException ignored) {}
1431+
}
1432+
}).start();
1433+
1434+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
1435+
@Override
1436+
public Boolean get() {
1437+
return callStarted.get() && createSocketCalled.get() == 1;
1438+
}
1439+
}, 50, 60000);
1440+
1441+
// stop() should stop the client immediately without any more retries
1442+
client.stop();
1443+
assertEquals(1, createSocketCalled.get());
1444+
}
1445+
14011446
private void assertRetriesOnSocketTimeouts(Configuration conf,
14021447
int maxTimeoutRetries) throws IOException {
14031448
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);

0 commit comments

Comments
 (0)