Skip to content

Commit

Permalink
[Java Client] Use common request timeout handling for lookup requests (
Browse files Browse the repository at this point in the history
…apache#10066)

- also fix possible memory leak since the lookup request would never get removed from
  pending requests if a timeout happens and a lookup response is never received
- use System.nanoTime() for calculating elapsed time
  • Loading branch information
lhotari authored Mar 30, 2021
1 parent 08df693 commit c9c1167
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
Expand All @@ -33,7 +31,6 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
Expand All @@ -49,9 +46,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import javax.net.ssl.SSLSession;

import lombok.Getter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -63,9 +58,9 @@
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.api.proto.CommandAckResponse;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
Expand All @@ -79,8 +74,8 @@
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse;
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandMessage;
Expand All @@ -91,15 +86,14 @@
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -156,15 +150,20 @@ enum State {
}

private static class RequestTime {
final long creationTimeMs;
private final long creationTimeNanos;
final long requestId;
final RequestType requestType;

RequestTime(long creationTime, long requestId, RequestType requestType) {
this.creationTimeMs = creationTime;
RequestTime(long requestId, RequestType requestType) {
this.creationTimeNanos = System.nanoTime();
this.requestId = requestId;
this.requestType = requestType;
}

boolean isTimedOut(long timeoutMillis) {
long requestAgeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTimeNanos);
return requestAgeMillis > timeoutMillis;
}
}

private enum RequestType {
Expand All @@ -173,7 +172,8 @@ private enum RequestType {
GetTopics,
GetSchema,
GetOrCreateSchema,
AckResponse;
AckResponse,
Lookup;

String getDescription() {
if (this == Command) {
Expand Down Expand Up @@ -582,9 +582,10 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
if (!lookupResult.hasResponse()
|| CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
if (lookupResult.hasError()) {
checkServerError(lookupResult.getError(), lookupResult.getMessage());
String message = lookupResult.hasMessage() ? lookupResult.getMessage() : null;
checkServerError(lookupResult.getError(), message);
requestFuture.completeExceptionally(
getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
getPulsarClientException(lookupResult.getError(), message));
} else {
requestFuture
.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
Expand Down Expand Up @@ -613,12 +614,7 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
// caller of this method needs to be protected under pendingLookupRequestSemaphore
private void addPendingLookupRequests(long requestId, TimedCompletableFuture<LookupDataResult> future) {
pendingRequests.put(requestId, future);
eventLoopGroup.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException(
requestId + " lookup request timedout after ms " + operationTimeoutMs));
}
}, operationTimeoutMs, TimeUnit.MILLISECONDS);
requestTimeoutQueue.add(new RequestTime(requestId, RequestType.Lookup));
}

private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
Expand Down Expand Up @@ -865,7 +861,7 @@ private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long reques
} else {
ctx.write(requestMessage, ctx().voidPromise());
}
requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
requestTimeoutQueue.add(new RequestTime(requestId, requestType));
}

private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
Expand Down Expand Up @@ -1133,19 +1129,21 @@ public void close() {
private void checkRequestTimeout() {
while (!requestTimeoutQueue.isEmpty()) {
RequestTime request = requestTimeoutQueue.peek();
if (request == null || (System.currentTimeMillis() - request.creationTimeMs) < operationTimeoutMs) {
if (request == null || !request.isTimedOut(operationTimeoutMs)) {
// if there is no request that is timed out then exit the loop
break;
}
request = requestTimeoutQueue.poll();
TimedCompletableFuture<?> requestFuture = pendingRequests.get(request.requestId);
if (requestFuture != null
&& !requestFuture.isDone()
&& !requestFuture.hasGotResponse()) {
pendingRequests.remove(request.requestId, requestFuture);
String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
log.warn("{} {}", ctx.channel(), timeoutMessage);
if (!requestFuture.isDone()) {
String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId,
request.requestType.getDescription(), operationTimeoutMs);
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
log.warn("{} {}", ctx.channel(), timeoutMessage);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand All @@ -42,28 +44,25 @@
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

public class ClientCnxTest {

@Test
public void testClientCnxTimeout() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
conf.setKeepAliveIntervalSeconds(0);
ClientCnx cnx = new ClientCnx(conf, eventLoop);

ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.channel()).thenReturn(channel);
ChannelFuture listenerFuture = mock(ChannelFuture.class);
when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);

Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
ctxField.setAccessible(true);
ctxField.set(cnx, ctx);
cnx.channelActive(ctx);

try {
cnx.newLookup(null, 123).get();
} catch (Exception e) {
Expand Down

0 comments on commit c9c1167

Please sign in to comment.