Skip to content

Commit

Permalink
Fixed wait-for-exclusive producer triggering of operation timeout (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 18, 2021
1 parent 73d0aac commit 0f28a68
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ public void waitForExclusiveWithClientTimeout(String type, boolean partitioned)
.operationTimeout(1, TimeUnit.SECONDS)
.build();

Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
Producer<String> p1 = client.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.WaitForExclusive)
.create();

CompletableFuture<Producer<String>> fp2 = pulsarClient.newProducer(Schema.STRING)
CompletableFuture<Producer<String>> fp2 = client.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.WaitForExclusive)
.createAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
// We got a success operation but the producer is not ready. This means that the producer has been queued up
// in broker. We need to leave the future pending until we get the final confirmation. We just mark that
// we have received a response, in order to avoid the timeout.
TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
TimedCompletableFuture<?> requestFuture = pendingRequests.get(requestId);
if (requestFuture != null) {
log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
success.getProducerName(), requestId);
Expand Down Expand Up @@ -1137,10 +1137,11 @@ private void checkRequestTimeout() {
break;
}
request = requestTimeoutQueue.poll();
TimedCompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
TimedCompletableFuture<?> requestFuture = pendingRequests.get(request.requestId);
if (requestFuture != null
&& !requestFuture.isDone()
&& !requestFuture.hasGotResponse()) {
pendingRequests.remove(request.requestId, requestFuture);
String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
log.warn("{} {}", ctx.channel(), timeoutMessage);
Expand Down

0 comments on commit 0f28a68

Please sign in to comment.