Skip to content

Commit

Permalink
[Tests] Fix flaky test ConnectionTimeoutTest (apache#15865)
Browse files Browse the repository at this point in the history
- the original test assumed that 192.0.2.1 is a "black hole" and can be used for testing
  connection timeouts.
  This is an assumption which doesn't hold when a firewall blocks such connections.

- create a dummy TCP/IP server and fill the backlog so that it won't respond and becomes
  a "black hole"
  • Loading branch information
lhotari authored May 31, 2022
1 parent 96a3aa7 commit bc272fa
Showing 1 changed file with 48 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,65 @@
*/
package org.apache.pulsar.client.impl;

import io.netty.channel.ConnectTimeoutException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ConnectTimeoutException;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ConnectionTimeoutTest {

// 192.0.2.0/24 is assigned for documentation, should be a deadend
static final String blackholeBroker = "pulsar://192.0.2.1:1234";

@Test
public void testLowTimeout() throws Exception {
try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker)
.connectionTimeout(1, TimeUnit.MILLISECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
CompletableFuture<?> lowFuture = clientLow.newProducer().topic("foo").createAsync();
int backlogSize = 1;
// create a dummy server and fill the backlog of the server so that it won't respond
// so that the client timeout can be tested with this server
try (ServerSocket serverSocket = new ServerSocket(0, backlogSize, InetAddress.getByName("localhost"))) {
CountDownLatch latch = new CountDownLatch(backlogSize + 1);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < backlogSize + 1; i++) {
Thread connectThread = new Thread(() -> {
try (Socket socket = new Socket()) {
socket.connect(serverSocket.getLocalSocketAddress());
latch.countDown();
Thread.sleep(10000L);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
connectThread.start();
threads.add(connectThread);
}
latch.await();

String blackholeBroker =
"pulsar://" + serverSocket.getInetAddress().getHostAddress() + ":" + serverSocket.getLocalPort();

try {
lowFuture.get();
Assert.fail("Shouldn't be able to connect to anything");
} catch (Exception e) {
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class);
try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker)
.connectionTimeout(1, TimeUnit.MILLISECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
CompletableFuture<?> lowFuture = clientLow.newProducer().topic("foo").createAsync();
try {
lowFuture.get(10, TimeUnit.SECONDS);
Assert.fail("Shouldn't be able to connect to anything");
} catch (TimeoutException e) {
Assert.fail("Connection timeout didn't apply.");
} catch (Exception e) {
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class);
}
} finally {
threads.stream().forEach(Thread::interrupt);
}
}
}
Expand Down

0 comments on commit bc272fa

Please sign in to comment.