Skip to content

Commit

Permalink
KAFKA-13933: Fix stuck SSL unit tests in case of authentication failu…
Browse files Browse the repository at this point in the history
…re (apache#12159)

When there is an authentication error after the initial TCP connection, the selector never becomes READY, and these tests wait forever waiting for this state.

This will happen while using an JDK like OpenJDK build that does not support the required cipher suites.

Reviewers: Luke Chen <[email protected]>,  Tom Bentley <[email protected]>, Divij Vaidya <[email protected]>
  • Loading branch information
fvaleri authored Jun 5, 2022
1 parent 6ff2bf0 commit af71375
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.kafka.common.config.AbstractConfig;
Expand Down Expand Up @@ -87,13 +88,23 @@ public static void checkClientConnection(Selector selector, String node, int min
}
}

public static void waitForChannelConnected(Selector selector, String node) throws IOException {
int secondsLeft = 30;
while (selector.channel(node) != null
&& !selector.channel(node).isConnected() && secondsLeft-- > 0) {
selector.poll(1000L);
}
assertNotNull(selector.channel(node));
assertTrue(selector.channel(node).isConnected(), String.format("Channel %s was not connected after 30 seconds", node));
}

public static void waitForChannelReady(Selector selector, String node) throws IOException {
// wait for handshake to finish
int secondsLeft = 30;
while (!selector.isChannelReady(node) && secondsLeft-- > 0) {
selector.poll(1000L);
}
assertTrue(selector.isChannelReady(node));
assertTrue(selector.isChannelReady(node), String.format("Channel %s was not ready after 30 seconds", node));
}

public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


/**
* A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
*/
@Timeout(240)
public class SelectorTest {
protected static final int BUFFER_SIZE = 4 * 1024;
private static final String METRIC_GROUP = "MetricGroup";
private static final long CONNECTION_MAX_IDLE_MS = 5_000;

protected EchoServer server;
protected Time time;
Expand All @@ -96,7 +96,7 @@ public void setUp() throws Exception {
this.channelBuilder = new PlaintextChannelBuilder(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
this.channelBuilder.configure(clientConfigs());
this.metrics = new Metrics();
this.selector = new Selector(5000, this.metrics, time, METRIC_GROUP, channelBuilder, new LogContext());
this.selector = new Selector(CONNECTION_MAX_IDLE_MS, this.metrics, time, METRIC_GROUP, channelBuilder, new LogContext());
}

@AfterEach
Expand Down Expand Up @@ -418,7 +418,7 @@ public void close() throws IOException {
}
};
channelBuilder.configure(clientConfigs());
Selector selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
Selector selector = new Selector(CONNECTION_MAX_IDLE_MS, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
selector.connect("0", new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
selector.connect("1", new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
assertThrows(RuntimeException.class, selector::close);
Expand All @@ -437,7 +437,7 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize
public void close() {
}
};
Selector selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
Selector selector = new Selector(CONNECTION_MAX_IDLE_MS, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
IOException e = assertThrows(IOException.class, () -> selector.register("1", socketChannel));
Expand All @@ -449,9 +449,9 @@ public void close() {
@Test
public void testCloseOldestConnection() throws Exception {
String id = "0";
blockingConnect(id);

time.sleep(6000); // The max idle time is 5000ms
selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelConnected(selector, id);
time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
selector.poll(0);

assertTrue(selector.disconnected().containsKey(id), "The idle connection should have been closed");
Expand All @@ -465,7 +465,7 @@ public void testIdleExpiryWithoutReadyKeys() throws IOException {
KafkaChannel channel = selector.channel(id);
channel.selectionKey().interestOps(0);

time.sleep(6000); // The max idle time is 5000ms
time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
selector.poll(0);
assertTrue(selector.disconnected().containsKey(id), "The idle connection should have been closed");
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
Expand All @@ -474,7 +474,7 @@ public void testIdleExpiryWithoutReadyKeys() throws IOException {
@Test
public void testImmediatelyConnectedCleaned() throws Exception {
Metrics metrics = new Metrics(); // new metrics object to avoid metric registration conflicts
Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
Selector selector = new ImmediatelyConnectingSelector(CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup", channelBuilder, new LogContext());

try {
testImmediatelyConnectedCleaned(selector, true);
Expand Down Expand Up @@ -525,7 +525,7 @@ private void testImmediatelyConnectedCleaned(Selector selector, boolean closeAft
public void testConnectException() throws Exception {
Metrics metrics = new Metrics();
AtomicBoolean throwIOException = new AtomicBoolean();
Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
Selector selector = new ImmediatelyConnectingSelector(CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
@Override
protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
SelectionKey key = super.registerChannel(id, socketChannel, interestedOps);
Expand Down Expand Up @@ -581,7 +581,7 @@ public void testExpireClosedConnectionWithPendingReceives() throws Exception {
private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
String id = channel.id();
selector.mute(id); // Mute to allow channel to be expired even if more data is available for read
time.sleep(6000); // The max idle time is 5000ms
time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
selector.poll(0);
assertNull(selector.channel(id), "Channel not expired");
assertNull(selector.closingChannel(id), "Channel not removed from closingChannels");
Expand All @@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) throws Exception {
public void testCloseOldestConnectionWithMultiplePendingReceives() throws Exception {
int expectedReceives = 5;
KafkaChannel channel = createConnectionWithPendingReceives(expectedReceives);
String id = channel.id();
int completedReceives = 0;
int completedReceives = selector.completedReceives().size();

while (selector.disconnected().isEmpty()) {
time.sleep(6000); // The max idle time is 5000ms
selector.poll(completedReceives == expectedReceives ? 0 : 1000);
time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
completedReceives += selector.completedReceives().size();
if (!selector.completedReceives().isEmpty()) {
assertEquals(1, selector.completedReceives().size());
assertNotNull(selector.channel(id), "Channel should not have been expired");
assertTrue(selector.closingChannel(id) != null || selector.channel(id) != null, "Channel not found");
assertFalse(selector.disconnected().containsKey(id), "Disconnect notified too early");
}
}

assertEquals(expectedReceives, completedReceives);
assertNull(selector.channel(id), "Channel not removed");
assertNull(selector.closingChannel(id), "Channel not removed");
assertTrue(selector.disconnected().containsKey(id), "Disconnect not notified");
assertNull(selector.channel(channel.id()), "Channel not expired");
assertNull(selector.closingChannel(channel.id()), "Channel not expired");
assertTrue(selector.disconnected().containsKey(channel.id()), "Disconnect not notified");
assertTrue(selector.completedReceives().isEmpty(), "Unexpected receive");
}

Expand Down Expand Up @@ -685,7 +680,7 @@ public void testMuteOnOOM() throws Exception {
//clean up default selector, replace it with one that uses a finite mem pool
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
selector = new Selector(NetworkReceive.UNLIMITED, CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup",
new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());

try (ServerSocketChannel ss = ServerSocketChannel.open()) {
Expand Down Expand Up @@ -927,7 +922,7 @@ public void testLowestPriorityChannel() throws Exception {
@Test
public void testMetricsCleanupOnSelectorClose() throws Exception {
Metrics metrics = new Metrics();
Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
Selector selector = new ImmediatelyConnectingSelector(CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
@Override
public void close(String id) {
throw new RuntimeException();
Expand Down Expand Up @@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws IOException {

protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
while (!selector.connected().contains(node))
selector.poll(10000L);
while (!selector.isChannelReady(node))
selector.poll(10000L);
NetworkTestUtils.waitForChannelReady(selector, node);
}

protected final NetworkSend createSend(String node, String payload) {
Expand Down Expand Up @@ -1127,7 +1119,7 @@ private KafkaMetric findUntaggedMetricByName(String name) {
* Creates a connection, sends the specified number of requests and returns without reading
* any incoming data. Some of the incoming data may be in the socket buffers when this method
* returns, but there is no guarantee that all the data from the server will be available
* immediately.
* immediately.
*/
private KafkaChannel createConnectionWithPendingReceives(int pendingReceives) throws Exception {
String id = "0";
Expand All @@ -1138,20 +1130,18 @@ private KafkaChannel createConnectionWithPendingReceives(int pendingReceives) th
}

/**
* Sends the specified number of requests and waits for the requests to be sent. The channel
* is muted during polling to ensure that incoming data is not received.
* Sends the specified number of requests and waits for the requests to be sent.
* The channel is muted during polling to ensure that incoming data is not received.
*/
private KafkaChannel sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
channel.mute();
private void sendNoReceive(KafkaChannel channel, int numRequests) throws Exception {
selector.mute(channel.id());
for (int i = 0; i < numRequests; i++) {
selector.send(createSend(channel.id(), String.valueOf(i)));
do {
selector.poll(10);
} while (selector.completedSends().isEmpty());
}
channel.maybeUnmute();

return channel;
selector.unmute(channel.id());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ public void testConnectionWithCustomKeyManager() throws Exception {
Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());

selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
while (!selector.connected().contains(node))
selector.poll(10000L);
while (!selector.isChannelReady(node))
selector.poll(10000L);
NetworkTestUtils.waitForChannelReady(selector, node);

selector.send(createSend(node, request));

Expand Down Expand Up @@ -365,7 +362,7 @@ static class TestSslTransportLayer extends SslTransportLayer {
boolean muteSocket = false;

public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
ChannelMetadataRegistry metadataRegistry) throws IOException {
ChannelMetadataRegistry metadataRegistry) {
super(channelId, key, sslEngine, metadataRegistry);
transportLayers.put(channelId, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ public void testRenegotiationFails() throws Exception {
// create connections
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelReady(selector, node);

// send echo requests and receive responses
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(createSend(node, node + "-" + 0));
selector.poll(0L);
server.renegotiate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ public void testKeyUpdate() throws Exception {
// create connections
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelReady(selector, node);

// send echo requests and receive responses
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(createSend(node, node + "-" + 0));
selector.poll(0L);
server.renegotiate();
Expand Down

0 comments on commit af71375

Please sign in to comment.