Skip to content

Commit

Permalink
[client] Consider a timed-out lookup equivalent to TooManyRequests (a…
Browse files Browse the repository at this point in the history
…pache#11688)

Lookups and PartitionMetadataRequests(PMR) will get TooManyRequest if
the broker they are talking to is making a lot of async requests to ZK
and thus accepting and parking a lot of requests. This happens if
there is a herding of clients requesting a lot of different
topics. However, if there are few topics, the result will be cached
and served synchronously. If a herd occurs under these conditions,
clients will get a timeout as the i/o handler threads could be
saturated and unable to serve the request in time. In certain
scenarios, such as when the service url is a VIP or load balancer, the
retry request will hit the exact same endpoint. If the broker serving
is overloaded, or just plain broken, the client will get the same
result. Lookups and PMR can be served by any broker, so it makes sense
after a certain number of failures to try another broker. Closing the
connection achieves this.

There is a setting,
ClientBuilder#maxNumberOfRejectedRequestPerConnection to close the
connection when a threshold of rejected requests is reached. Thus far
this considered only requests that got TooManyRequests responses to be
rejected. This change adds Timeout to the list of "rejected"
responses.
  • Loading branch information
ivankelly authored Aug 17, 2021
1 parent f401b10 commit fdd7170
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ protected void setup() throws Exception {
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));

connectionsCreated.set(0);
}

@Override
Expand Down Expand Up @@ -223,6 +225,36 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
}
}

@Test
public void testCloseConnectionOnBrokerTimeout() throws Exception {
String lookupUrl = pulsar.getBrokerServiceUrl();
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl)
.maxNumberOfRejectedRequestPerConnection(1)
.connectionTimeout(2, TimeUnit.SECONDS)
.operationTimeout(1, TimeUnit.SECONDS)
.lookupTimeout(10, TimeUnit.SECONDS)
.build()) {

// need 2 Timeouts because it takes the count before incrementing
pulsarClient.newProducer().topic("TIMEOUT:2").create().close();

Assert.assertEquals(connectionsCreated.get(), 2);
}

try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl)
.maxNumberOfRejectedRequestPerConnection(100)
.maxNumberOfRejectedRequestPerConnection(1)
.connectionTimeout(2, TimeUnit.SECONDS)
.operationTimeout(1, TimeUnit.SECONDS)
.lookupTimeout(10, TimeUnit.SECONDS)
.build()) {

pulsarClient.newProducer().topic("TIMEOUT:2").create().close();
pulsarClient.newProducer().topic("TIMEOUT:2").create().close();
Assert.assertEquals(connectionsCreated.get(), 3);
}
}

enum LookupError {
UNKNOWN,
TOO_MANY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,16 +1014,20 @@ private void checkServerError(ServerError error, String errMsg) {
log.error("{} Close connection because received internal-server error {}", ctx.channel(), errMsg);
ctx.close();
} else if (ServerError.TooManyRequests.equals(error)) {
long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
if (rejectedRequests == 0) {
// schedule timer
eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(ClientCnx.this, 0),
rejectedRequestResetTimeSec, TimeUnit.SECONDS);
} else if (rejectedRequests >= maxNumberOfRejectedRequestPerConnection) {
log.error("{} Close connection because received {} rejected request in {} seconds ", ctx.channel(),
NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(ClientCnx.this), rejectedRequestResetTimeSec);
ctx.close();
}
incrementRejectsAndMaybeClose();
}
}

private void incrementRejectsAndMaybeClose() {
long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
if (rejectedRequests == 0) {
// schedule timer
eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(ClientCnx.this, 0),
rejectedRequestResetTimeSec, TimeUnit.SECONDS);
} else if (rejectedRequests >= maxNumberOfRejectedRequestPerConnection) {
log.error("{} Close connection because received {} rejected request in {} seconds ", ctx.channel(),
NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(ClientCnx.this), rejectedRequestResetTimeSec);
ctx.close();
}
}

Expand Down Expand Up @@ -1168,6 +1172,9 @@ private void checkRequestTimeout() {
request.requestType.getDescription(), operationTimeoutMs,
request.requestId, remoteAddress, localAddress);
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
if (request.requestType == RequestType.Lookup) {
incrementRejectsAndMaybeClose();
}
log.warn("{} {}", ctx.channel(), timeoutMessage);
}
}
Expand Down

0 comments on commit fdd7170

Please sign in to comment.