Skip to content

Commit

Permalink
Fix partitionsAutoUpdateFuture never complete (apache#14625)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenbingshen authored Mar 17, 2022
1 parent 50a7e50 commit b06dac6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -4351,4 +4352,52 @@ public void testShareConsumerWithMessageListener() throws Exception {
assertEquals(resultSet.size(), total);
});
}

@Test
public void testPartitionsAutoUpdate() throws Exception {
log.info("-- Starting {} test --", methodName);

int numPartitions = 3;
TopicName topicName = TopicName.get("persistent://my-property/my-ns/partitionsAutoUpdate-1");
admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);

int operationTimeout = 2000; // MILLISECONDS
@Cleanup final PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.operationTimeout(operationTimeout, TimeUnit.MILLISECONDS)
.build();

ProducerBuilder<byte[]> producerBuilder = client.newProducer()
.topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);

@Cleanup
PartitionedProducerImpl<byte[]> partitionedProducer =
(PartitionedProducerImpl<byte[]>) producerBuilder.autoUpdatePartitions(true).create();

// Trigger the Connection refused exception
stopBroker();

log.info("trigger partitionsAutoUpdateTimerTask run failed for producer");
Timeout timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Awaitility.await().untilAsserted(() -> {
assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
assertTrue(FutureUtil.getException(partitionedProducer.getPartitionsAutoUpdateFuture()).get().getMessage()
.contains("Connection refused:"));
});

startBroker();

log.info("trigger partitionsAutoUpdateTimerTask run successful for producer");
timeout = partitionedProducer.getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Awaitility.await().untilAsserted(() -> {
assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture());
assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isDone());
assertFalse(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally());
});

log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende

if (log.isDebugEnabled()) {
log.debug("[{}] partitions number. old: {}, new: {}",
topic, oldPartitionNumber, currentPartitionNumber);
topic, oldPartitionNumber, currentPartitionNumber);
}

if (oldPartitionNumber == currentPartitionNumber) {
Expand Down Expand Up @@ -437,10 +437,14 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
}
} else {
log.error("[{}] not support shrink topic partitions. old: {}, new: {}",
topic, oldPartitionNumber, currentPartitionNumber);
topic, oldPartitionNumber, currentPartitionNumber);
future.completeExceptionally(new NotSupportedException("not support shrink topic partitions"));
}
return future;
}).exceptionally(throwable -> {
log.error("[{}] Auto getting partitions failed", topic, throwable);
future.completeExceptionally(throwable);
return null;
});

return future;
Expand Down Expand Up @@ -476,6 +480,11 @@ public void run(Timeout timeout) throws Exception {
}
};

@VisibleForTesting
public CompletableFuture<Void> getPartitionsAutoUpdateFuture() {
return partitionsAutoUpdateFuture;
}

@VisibleForTesting
public Timeout getPartitionsAutoUpdateTimeout() {
return partitionsAutoUpdateTimeout;
Expand Down

0 comments on commit b06dac6

Please sign in to comment.