Skip to content

Commit

Permalink
PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (a…
Browse files Browse the repository at this point in the history
…pache#9256)

ServerCnx had a callback that was called from Producer/Consumer which
would remove the producer/consumer from its map using only the
ID. However, it is possible that this callback runs when the
producer/consumer had already been removed from the map and another
producer/consumer added in its place.

The solution is to use both the key and value when removing from the
map.

The change also updates the log messages to include the producerId and
consumerId in a format that all log messages for an individual
producerId/consumerId can be easier found.

A test has been changed because the test was depending on the broken
behaviour. What was happening was that the fail topic producer was
failing to create a producer, and when this happened it removed the
producer future for the successful producer. Then, when the third
producer tries to connect, it sees manages to create the producer on
the connection, but fails as there is already a producer with that
name on the topic. The correct behaviour is that it should see the
successful producer future for that ID and respond with success.

Co-authored-by: Ivan Kelly <[email protected]>
  • Loading branch information
ivankelly and Ivan Kelly authored Jan 21, 2021
1 parent 0770ae6 commit 1e4c3ec
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress,
consumerId, consumer);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
Expand All @@ -911,14 +912,14 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection",
remoteAddress, topicName, subscriptionName, consumerId);
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
Expand Down Expand Up @@ -995,11 +996,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage(), exception);
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}

// If client timed out, the future would have been completed by subsequent close.
Expand Down Expand Up @@ -1095,10 +1098,11 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (existingProducerFuture != null) {
if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress,
producerId, producer);
log.info("[{}] Producer with the same id is already created:"
+ " producerId={}, producer={}", remoteAddress, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());

return null;
} else {
// There was an early request to create a producer with
Expand All @@ -1114,12 +1118,13 @@ protected void handleProducer(final CommandProducer cmdProducer) {
} else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception
producers.remove(producerId);
producers.remove(producerId, existingProducerFuture);
}
log.warn("[{}][{}] Producer with id {} is already present on the connection",
remoteAddress, producerId, topicName);
log.warn("[{}][{}] Producer with id is already present on the connection,"
+ " producerId={}", remoteAddress, topicName, producerId);
commandSender.sendErrorResponse(requestId, error,
"Producer is already present on the connection");
"Producer is already present on the connection");

return null;
}
}
Expand Down Expand Up @@ -1201,8 +1206,9 @@ protected void handleProducer(final CommandProducer cmdProducer) {

producers.remove(producerId, producerFuture);
}).exceptionally(ex -> {
log.warn("[{}] Failed to add producer {}: {}",
remoteAddress, producer, ex.getMessage());
log.error("[{}] Failed to add producer to topic {}: producerId={}, {}",
remoteAddress, topicName, producerId, ex.getMessage());

producer.closeNow(true);
if (producerFuture.completeExceptionally(ex)) {
commandSender.sendErrorResponse(requestId,
Expand Down Expand Up @@ -1231,7 +1237,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {

if (!(cause instanceof ServiceUnitNotReadyException)) {
// Do not print stack traces for expected exceptions
log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
log.error("[{}] Failed to create topic {}, producerId={}",
remoteAddress, topicName, producerId, exception);
}

// If client timed out, the future would have been completed
Expand Down Expand Up @@ -1474,7 +1481,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {

CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Producer was not registered on the connection");
return;
Expand All @@ -1484,24 +1491,28 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
// We have received a request to close the producer before it was actually completed, we have marked the
// producer future as failed and we can tell the client the close operation was successful.
log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId);
log.info("[{}] Closed producer before its creation was completed. producerId={}",
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
} else if (producerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId);
log.info("[{}] Closed producer that already failed to be created. producerId={}",
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
}

// Proceed with normal close, the producer
Producer producer = producerFuture.getNow(null);
log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);
log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(), remoteAddress, producerId);

producer.close(true).thenAccept(v -> {
log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
remoteAddress);
log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(),
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
});
Expand All @@ -1510,14 +1521,14 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
@Override
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
checkArgument(state == State.Connected);
log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId());
log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId());

long requestId = closeConsumer.getRequestId();
long consumerId = closeConsumer.getConsumerId();

CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
return;
}
Expand All @@ -1527,13 +1538,15 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
// We have received a request to close the consumer before it was actually completed, we have marked the
// consumer future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new consumer will be discarded.
log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId);
log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}

if (consumerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId);
log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}
Expand All @@ -1544,7 +1557,7 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
consumer.close();
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer {}", remoteAddress, consumer);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
Expand Down Expand Up @@ -1943,13 +1956,9 @@ protected void interceptCommand(BaseCommand command) throws InterceptException {

public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
if (log.isDebugEnabled()) {
log.debug("[{}] Removed producer: {}", remoteAddress, producer);
}
long producerId = producer.getProducerId();
producers.remove(producerId);
safelyRemoveProducer(producer);
if (remoteEndpointProtocolVersion >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
} else {
close();
}
Expand All @@ -1959,13 +1968,9 @@ public void closeProducer(Producer producer) {
@Override
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
if (log.isDebugEnabled()) {
log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
}
long consumerId = consumer.consumerId();
consumers.remove(consumerId);
safelyRemoveConsumer(consumer);
if (remoteEndpointProtocolVersion >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
} else {
close();
}
Expand All @@ -1986,19 +1991,42 @@ public SocketAddress clientAddress() {

@Override
public void removedConsumer(Consumer consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
}

consumers.remove(consumer.consumerId());
safelyRemoveConsumer(consumer);
}

@Override
public void removedProducer(Producer producer) {
safelyRemoveProducer(producer);
}

private void safelyRemoveProducer(Producer producer) {
long producerId = producer.getProducerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Removed producer: {}", remoteAddress, producer);
log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer);
}
CompletableFuture<Producer> future = producers.get(producerId);
if (future != null) {
future.whenComplete((producer2, exception) -> {
if (exception != null || producer2 == producer) {
producers.remove(producerId, future);
}
});
}
}

private void safelyRemoveConsumer(Consumer consumer) {
long consumerId = consumer.consumerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer);
}
CompletableFuture<Consumer> future = consumers.get(consumerId);
if (future != null) {
future.whenComplete((consumer2, exception) -> {
if (exception != null || consumer2 == consumer) {
consumers.remove(consumerId, future);
}
});
}
producers.remove(producer.getProducerId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,10 +954,10 @@ public void testCreateProducerBookieTimeout() throws Exception {
producerName, Collections.emptyMap());
channel.writeInbound(createProducer3);

// 3rd producer fails because 2nd is already connected
// 3rd producer succeeds because 2nd is already connected
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 4);
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);

Thread.sleep(500);

Expand Down

0 comments on commit 1e4c3ec

Please sign in to comment.