From 1e4c3ec4f55ad0b0729f2849915f6b4d9e426bb1 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Thu, 21 Jan 2021 22:52:45 +0100 Subject: [PATCH] PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256) ServerCnx had a callback that was called from Producer/Consumer which would remove the producer/consumer from its map using only the ID. However, it is possible that this callback runs when the producer/consumer had already been removed from the map and another producer/consumer added in its place. The solution is to use both the key and value when removing from the map. The change also updates the log messages to include the producerId and consumerId in a format that all log messages for an individual producerId/consumerId can be easier found. A test has been changed because the test was depending on the broken behaviour. What was happening was that the fail topic producer was failing to create a producer, and when this happened it removed the producer future for the successful producer. Then, when the third producer tries to connect, it sees manages to create the producer on the connection, but fails as there is already a producer with that name on the topic. The correct behaviour is that it should see the successful producer future for that ID and respond with success. Co-authored-by: Ivan Kelly --- .../pulsar/broker/service/ServerCnx.java | 124 +++++++++++------- .../pulsar/broker/service/ServerCnxTest.java | 6 +- 2 files changed, 79 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index aea1f6b190ada..8a4d5a94947a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -901,8 +901,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (existingConsumerFuture != null) { if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { Consumer consumer = existingConsumerFuture.getNow(null); - log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress, - consumerId, consumer); + log.info("[{}] Consumer with the same id is already created:" + + " consumerId={}, consumer={}", + remoteAddress, consumerId, consumer); commandSender.sendSuccessResponse(requestId); return null; } else { @@ -911,14 +912,14 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { // client timeout is lower the broker timeouts. We need to wait until the previous // consumer // creation request either complete or fails. - log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", - remoteAddress, topicName, subscriptionName, consumerId); + log.warn("[{}][{}][{}] Consumer with id is already present on the connection," + + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = null; if (!existingConsumerFuture.isDone()) { error = ServerError.ServiceNotReady; } else { error = getErrorCode(existingConsumerFuture); - consumers.remove(consumerId); + consumers.remove(consumerId, existingConsumerFuture); } commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection"); @@ -995,11 +996,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { exception.getCause().getMessage()); } } else if (exception.getCause() instanceof BrokerServiceException) { - log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, - subscriptionName, exception.getCause().getMessage()); + log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", + remoteAddress, topicName, subscriptionName, + consumerId, exception.getCause().getMessage()); } else { - log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, - subscriptionName, exception.getCause().getMessage(), exception); + log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", + remoteAddress, topicName, subscriptionName, + consumerId, exception.getCause().getMessage(), exception); } // If client timed out, the future would have been completed by subsequent close. @@ -1095,10 +1098,11 @@ protected void handleProducer(final CommandProducer cmdProducer) { if (existingProducerFuture != null) { if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { Producer producer = existingProducerFuture.getNow(null); - log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress, - producerId, producer); + log.info("[{}] Producer with the same id is already created:" + + " producerId={}, producer={}", remoteAddress, producerId, producer); commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion()); + return null; } else { // There was an early request to create a producer with @@ -1114,12 +1118,13 @@ protected void handleProducer(final CommandProducer cmdProducer) { } else { error = getErrorCode(existingProducerFuture); // remove producer with producerId as it's already completed with exception - producers.remove(producerId); + producers.remove(producerId, existingProducerFuture); } - log.warn("[{}][{}] Producer with id {} is already present on the connection", - remoteAddress, producerId, topicName); + log.warn("[{}][{}] Producer with id is already present on the connection," + + " producerId={}", remoteAddress, topicName, producerId); commandSender.sendErrorResponse(requestId, error, - "Producer is already present on the connection"); + "Producer is already present on the connection"); + return null; } } @@ -1201,8 +1206,9 @@ protected void handleProducer(final CommandProducer cmdProducer) { producers.remove(producerId, producerFuture); }).exceptionally(ex -> { - log.warn("[{}] Failed to add producer {}: {}", - remoteAddress, producer, ex.getMessage()); + log.error("[{}] Failed to add producer to topic {}: producerId={}, {}", + remoteAddress, topicName, producerId, ex.getMessage()); + producer.closeNow(true); if (producerFuture.completeExceptionally(ex)) { commandSender.sendErrorResponse(requestId, @@ -1231,7 +1237,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { if (!(cause instanceof ServiceUnitNotReadyException)) { // Do not print stack traces for expected exceptions - log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception); + log.error("[{}] Failed to create topic {}, producerId={}", + remoteAddress, topicName, producerId, exception); } // If client timed out, the future would have been completed @@ -1474,7 +1481,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { CompletableFuture producerFuture = producers.get(producerId); if (producerFuture == null) { - log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId); + log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId); commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Producer was not registered on the connection"); return; @@ -1484,12 +1491,14 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { .completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) { // We have received a request to close the producer before it was actually completed, we have marked the // producer future as failed and we can tell the client the close operation was successful. - log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId); + log.info("[{}] Closed producer before its creation was completed. producerId={}", + remoteAddress, producerId); commandSender.sendSuccessResponse(requestId); producers.remove(producerId, producerFuture); return; } else if (producerFuture.isCompletedExceptionally()) { - log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId); + log.info("[{}] Closed producer that already failed to be created. producerId={}", + remoteAddress, producerId); commandSender.sendSuccessResponse(requestId); producers.remove(producerId, producerFuture); return; @@ -1497,11 +1506,13 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { // Proceed with normal close, the producer Producer producer = producerFuture.getNow(null); - log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress); + log.info("[{}][{}] Closing producer on cnx {}. producerId={}", + producer.getTopic(), producer.getProducerName(), remoteAddress, producerId); producer.close(true).thenAccept(v -> { - log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(), - remoteAddress); + log.info("[{}][{}] Closed producer on cnx {}. producerId={}", + producer.getTopic(), producer.getProducerName(), + remoteAddress, producerId); commandSender.sendSuccessResponse(requestId); producers.remove(producerId, producerFuture); }); @@ -1510,14 +1521,14 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { @Override protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { checkArgument(state == State.Connected); - log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId()); + log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId()); long requestId = closeConsumer.getRequestId(); long consumerId = closeConsumer.getConsumerId(); CompletableFuture consumerFuture = consumers.get(consumerId); if (consumerFuture == null) { - log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress); + log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId); commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found"); return; } @@ -1527,13 +1538,15 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { // We have received a request to close the consumer before it was actually completed, we have marked the // consumer future as failed and we can tell the client the close operation was successful. When the actual // create operation will complete, the new consumer will be discarded. - log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId); + log.info("[{}] Closed consumer before its creation was completed. consumerId={}", + remoteAddress, consumerId); commandSender.sendSuccessResponse(requestId); return; } if (consumerFuture.isCompletedExceptionally()) { - log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId); + log.info("[{}] Closed consumer that already failed to be created. consumerId={}", + remoteAddress, consumerId); commandSender.sendSuccessResponse(requestId); return; } @@ -1544,7 +1557,7 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { consumer.close(); consumers.remove(consumerId, consumerFuture); commandSender.sendSuccessResponse(requestId); - log.info("[{}] Closed consumer {}", remoteAddress, consumer); + log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId); } catch (BrokerServiceException e) { log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e); commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage()); @@ -1943,13 +1956,9 @@ protected void interceptCommand(BaseCommand command) throws InterceptException { public void closeProducer(Producer producer) { // removes producer-connection from map and send close command to producer - if (log.isDebugEnabled()) { - log.debug("[{}] Removed producer: {}", remoteAddress, producer); - } - long producerId = producer.getProducerId(); - producers.remove(producerId); + safelyRemoveProducer(producer); if (remoteEndpointProtocolVersion >= v5.getValue()) { - ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L)); + ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L)); } else { close(); } @@ -1959,13 +1968,9 @@ public void closeProducer(Producer producer) { @Override public void closeConsumer(Consumer consumer) { // removes consumer-connection from map and send close command to consumer - if (log.isDebugEnabled()) { - log.debug("[{}] Removed consumer: {}", remoteAddress, consumer); - } - long consumerId = consumer.consumerId(); - consumers.remove(consumerId); + safelyRemoveConsumer(consumer); if (remoteEndpointProtocolVersion >= v5.getValue()) { - ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L)); + ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L)); } else { close(); } @@ -1986,19 +1991,42 @@ public SocketAddress clientAddress() { @Override public void removedConsumer(Consumer consumer) { - if (log.isDebugEnabled()) { - log.debug("[{}] Removed consumer: {}", remoteAddress, consumer); - } - - consumers.remove(consumer.consumerId()); + safelyRemoveConsumer(consumer); } @Override public void removedProducer(Producer producer) { + safelyRemoveProducer(producer); + } + + private void safelyRemoveProducer(Producer producer) { + long producerId = producer.getProducerId(); if (log.isDebugEnabled()) { - log.debug("[{}] Removed producer: {}", remoteAddress, producer); + log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer); + } + CompletableFuture future = producers.get(producerId); + if (future != null) { + future.whenComplete((producer2, exception) -> { + if (exception != null || producer2 == producer) { + producers.remove(producerId, future); + } + }); + } + } + + private void safelyRemoveConsumer(Consumer consumer) { + long consumerId = consumer.consumerId(); + if (log.isDebugEnabled()) { + log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer); + } + CompletableFuture future = consumers.get(consumerId); + if (future != null) { + future.whenComplete((consumer2, exception) -> { + if (exception != null || consumer2 == consumer) { + consumers.remove(consumerId, future); + } + }); } - producers.remove(producer.getProducerId()); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 7ff0c146d4f11..4bdbacb4c648b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -954,10 +954,10 @@ public void testCreateProducerBookieTimeout() throws Exception { producerName, Collections.emptyMap()); channel.writeInbound(createProducer3); - // 3rd producer fails because 2nd is already connected + // 3rd producer succeeds because 2nd is already connected response = getResponse(); - assertEquals(response.getClass(), CommandError.class); - assertEquals(((CommandError) response).getRequestId(), 4); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 4); Thread.sleep(500);