Skip to content

Commit

Permalink
[pulsar-broker] clean up producer/consumer result from connection-cac…
Browse files Browse the repository at this point in the history
…he (apache#4145)
  • Loading branch information
rdhabalia authored and merlimat committed Apr 26, 2019
1 parent 07de52d commit 479f067
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", remoteAddress,
topicName, subscriptionName, consumerId);
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
: getErrorCode(existingConsumerFuture);
ServerError error = null;
if(!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
}else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, consumerFuture);
}
ctx.writeAndFlush(Commands.newError(requestId, error,
"Consumer is already present on the connection"));
return null;
Expand Down Expand Up @@ -892,6 +897,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
producers.remove(producerId, producerFuture);
return;
}

Expand Down

0 comments on commit 479f067

Please sign in to comment.