Skip to content

Commit

Permalink
[pulsar-broker] Clean up closed producer to avoid publish-time for pr…
Browse files Browse the repository at this point in the history
…oducer (apache#5988)

* [pulsar-broker] Clean up closed producer to avoid publish-time  for producer

* fix test cases
  • Loading branch information
rdhabalia authored Jan 9, 2020
1 parent 7ec17b2 commit 0bc54c5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1210,14 +1210,15 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
if (!producerFuture.isDone() && producerFuture
.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
// We have received a request to close the producer before it was actually completed, we have marked the
// producer future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new producer will be discarded.
// producer future as failed and we can tell the client the close operation was successful.
log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId);
ctx.writeAndFlush(Commands.newSuccess(requestId));
producers.remove(producerId, producerFuture);
return;
} else if (producerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId);
ctx.writeAndFlush(Commands.newSuccess(requestId));
producers.remove(producerId, producerFuture);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
Expand Down Expand Up @@ -789,21 +790,19 @@ public void testCreateProducerTimeout() throws Exception {
producerName, Collections.emptyMap());
channel.writeInbound(createProducer2);

// Complete the topic opening
// Complete the topic opening: It will make 2nd producer creation successful
openTopicFuture.get().run();

// Close succeeds
Object response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);

// 2nd producer fails to create
// 2nd producer will be successfully created as topic is open by then
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 3);
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

// We should not receive response for 1st producer, since it was cancelled by the close
assertTrue(channel.outboundMessages().isEmpty());
assertTrue(channel.isActive());

channel.finish();
Expand Down Expand Up @@ -927,29 +926,29 @@ public void testCreateProducerBookieTimeout() throws Exception {
producerName, Collections.emptyMap());
channel.writeInbound(createProducer2);

// Now the topic gets opened
// Now the topic gets opened.. It will make 2nd producer creation successful
openFailedTopic.get().run();

// Close succeeds
Object response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);

// 2nd producer fails
// 2nd producer success as topic is opened
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 3);
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

// Wait till the failtopic timeout interval
Thread.sleep(500);
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
producerName, Collections.emptyMap());
channel.writeInbound(createProducer3);

// 3rd producer succeeds
// 3rd producer fails because 2nd is already connected
response = getResponse();
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 4);

Thread.sleep(500);

Expand Down Expand Up @@ -1611,4 +1610,32 @@ public void testInvalidTopicOnSubscribe() throws Exception {

channel.finish();
}

@Test
public void testDelayedClosedProducer() throws Exception {
resetChannel();
setChannelConnected();

CompletableFuture<Topic> delayFuture = new CompletableFuture<>();
doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
// Create producer first time
int producerId = 1;
ByteBuf clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap());
channel.writeInbound(clientCommand);

ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2);
channel.writeInbound(closeProducerCmd);

Topic topic = mock(Topic.class);
doReturn(CompletableFuture.completedFuture(topic)).when(brokerService).getOrCreateTopic(any(String.class));
doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema();

clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap());
channel.writeInbound(clientCommand);

Object response = getResponse();
assertTrue(response instanceof CommandSuccess);
}
}

0 comments on commit 0bc54c5

Please sign in to comment.