diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index a5b20f3586929..ff51d74cef7b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -691,7 +691,7 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { Consumer consumer = pulsarClient.newConsumer() .topicsPattern(pattern) - .patternAutoDiscoveryPeriod(2) + .patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index f63e95fb0f0bf..50328ff04a392 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -374,6 +374,21 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes); + + /** + * Set topics auto discovery period when using a pattern for topics consumer. + * + * @param interval + * the amount of delay between checks for + * new topics matching pattern set with {@link #topicsPattern(String)} + * @param unit + * the unit of the topics auto discovery period + * + * @return the consumer builder instance + */ + ConsumerBuilder patternAutoDiscoveryPeriod(int interval, TimeUnit unit); + + /** * Shared subscription * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f7848b6688cb9..872abb7b24f43 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -278,7 +278,15 @@ public ConsumerBuilder readCompacted(boolean readCompacted) { @Override public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) { checkArgument(periodInMinutes >= 0, "periodInMinutes needs to be >= 0"); - conf.setPatternAutoDiscoveryPeriod(periodInMinutes); + patternAutoDiscoveryPeriod(periodInMinutes, TimeUnit.MINUTES); + return this; + } + + @Override + public ConsumerBuilder patternAutoDiscoveryPeriod(int interval, TimeUnit unit) { + checkArgument(interval >= 0, "interval needs to be >= 0"); + int intervalSeconds = (int) unit.toSeconds(interval); + conf.setPatternAutoDiscoveryPeriod(intervalSeconds); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 0941a5804721c..347583088f097 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -46,7 +46,7 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl implements Serializable, Cloneable { private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; - private int patternAutoDiscoveryPeriod = 1; + private int patternAutoDiscoveryPeriod = 60; private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 3476c5a11ea2a..5620eb7f63a83 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -237,10 +237,15 @@ public void testConsumerBuilderImplWhenMaxTotalReceiverQueueSizeAcrossPartitions } @Test(expectedExceptions = IllegalArgumentException.class) - public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPropertyIsNegative() { + public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPeriodInMinutesIsNegative() { consumerBuilderImpl.patternAutoDiscoveryPeriod(-1); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPeriodIsNegative() { + consumerBuilderImpl.patternAutoDiscoveryPeriod(-1, TimeUnit.MINUTES); + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testConsumerBuilderImplWhenBatchReceivePolicyIsNull() { consumerBuilderImpl.batchReceivePolicy(null); @@ -261,6 +266,7 @@ public void testConsumerBuilderImplWhenNumericPropertiesAreValid() { consumerBuilderImpl.priorityLevel(1); consumerBuilderImpl.maxTotalReceiverQueueSizeAcrossPartitions(1); consumerBuilderImpl.patternAutoDiscoveryPeriod(1); + consumerBuilderImpl.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS); } }