Skip to content

Commit

Permalink
Add connection timeout client configuration option (apache#2852)
Browse files Browse the repository at this point in the history
Allows the client to specify how long to wait for brokers to respond.
  • Loading branch information
ivankelly authored and sijie committed Nov 16, 2018
1 parent aefbaac commit efa040f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,14 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @param unit time unit for {@code statsInterval}
*/
ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit unit);

/**
* Set the duration of time to wait for a connection to a broker to be established. If the duration
* passes without a response from the broker, the connection attempt is dropped.
*
* @since 2.3.0
* @param duration the duration to wait
* @param unit the time unit in which the duration is defined
*/
ClientBuilder connectionTimeout(int duration, TimeUnit unit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,27 @@ public ClientConfiguration setServiceUrl(String serviceUrl) {
return this;
}

/**
* Set the duration of time to wait for a connection to a broker to be established. If the duration
* passes without a response from the broker, the connection attempt is dropped.
*
* @param duration the duration to wait
* @param unit the time unit in which the duration is defined
*/
public void setConnectionTimeout(int duration, TimeUnit unit) {
confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
}

/**
* Get the duration of time for which the client will wait for a connection to a broker to be
* established before giving up.
*
* @return the duration, in milliseconds
*/
public long getConnectionTimeoutMs() {
return confData.getConnectionTimeoutMs();
}

public ClientConfigurationData getConfigurationData() {
return confData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ public ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit un
return this;
}

@Override
public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
conf.setConnectionTimeoutMs((int)unit.toMillis(duration));
return this;
}

public ClientConfigurationData getClientConfigurationData() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int maxLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;

private int connectionTimeoutMs = 10000;

public ClientConfigurationData clone() {
try {
return (ClientConfigurationData) super.clone();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.channel.ConnectTimeoutException;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import org.testng.Assert;
import org.testng.annotations.Test;

public class ConnectionTimeoutTest {

// 192.0.2.0/24 is assigned for documentation, should be a deadend
final static String blackholeBroker = "pulsar://192.0.2.1:1234";

@Test
public void testLowTimeout() throws Exception {
long startNanos = System.nanoTime();

try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker)
.connectionTimeout(1, TimeUnit.MILLISECONDS).build();
PulsarClient clientDefault = PulsarClient.builder().serviceUrl(blackholeBroker).build()) {
CompletableFuture<?> lowFuture = clientLow.newProducer().topic("foo").createAsync();
CompletableFuture<?> defaultFuture = clientDefault.newProducer().topic("foo").createAsync();

try {
lowFuture.get();
Assert.fail("Shouldn't be able to connect to anything");
} catch (Exception e) {
Assert.assertFalse(defaultFuture.isDone());
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
ConnectTimeoutException.class);
Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3));
}
}
}
}

0 comments on commit efa040f

Please sign in to comment.