Skip to content

Commit

Permalink
[Issue 5927][Pulsar Client] Change the time unit of `patternAutoDisco…
Browse files Browse the repository at this point in the history
…veryPeriod` to SECONDS (apache#5950)

Fixes apache#5927 

### Motivation
Per apache#5927, by adding one overload to configure patternAutoDiscoveryPeriod by time units, users may be able to configure the time duration to be less than 1 minute.

### Modifications

1. Add  an overload of `ConsumerBuilder.patternAutoDiscoveryPeriod`
2. Update original `ConsumerBuilderImpl.patternAutoDiscoveryPeriod` to call the new overload
3. Update the `PatternMultiTopicsConsumerImpl.recheckPatternTimeout` logic, this should be carefully reviewed, since I don't understand the original logic, it seems the old implementation will only allow 0 or 1 minute as the recheck interval, ignoring the user's periods which longer than 1 minute

### Verifying this change
- Add the unit test to verify param invalidness
- Update one unit test in `PatternTopicsConsumerImplTest.testAutoUnbubscribePatternConsumer` to use 10 seconds period
(if another new test is requested per review i will add it later)
  • Loading branch information
zhenglaizhang authored Feb 8, 2020
1 parent 98cf15f commit fed8c30
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(2)
.patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);


/**
* Set topics auto discovery period when using a pattern for topics consumer.
*
* @param interval
* the amount of delay between checks for
* new topics matching pattern set with {@link #topicsPattern(String)}
* @param unit
* the unit of the topics auto discovery period
*
* @return the consumer builder instance
*/
ConsumerBuilder<T> patternAutoDiscoveryPeriod(int interval, TimeUnit unit);


/**
* <b>Shared subscription</b>
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,15 @@ public ConsumerBuilder<T> readCompacted(boolean readCompacted) {
@Override
public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
checkArgument(periodInMinutes >= 0, "periodInMinutes needs to be >= 0");
conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
patternAutoDiscoveryPeriod(periodInMinutes, TimeUnit.MINUTES);
return this;
}

@Override
public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int interval, TimeUnit unit) {
checkArgument(interval >= 0, "interval needs to be >= 0");
int intervalSeconds = (int) unit.toSeconds(interval);
conf.setPatternAutoDiscoveryPeriod(intervalSeconds);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private final Mode subscriptionMode;
private volatile Timeout recheckPatternTimeout = null;
private volatile Timeout recheckPatternTimeout;

public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
PulsarClientImpl client,
Expand All @@ -65,7 +65,7 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));

this.topicsChangeListener = new PatternTopicsChangedListener();
recheckPatternTimeout = client.timer().newTimeout(this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
this.recheckPatternTimeout = client.timer().newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
}

public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
Expand Down Expand Up @@ -104,8 +104,8 @@ public void run(Timeout timeout) throws Exception {
});

// schedule the next re-check task
recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
}

public Pattern getPattern() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

private int patternAutoDiscoveryPeriod = 1;
private int patternAutoDiscoveryPeriod = 60;

private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,15 @@ public void testConsumerBuilderImplWhenMaxTotalReceiverQueueSizeAcrossPartitions
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPropertyIsNegative() {
public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPeriodInMinutesIsNegative() {
consumerBuilderImpl.patternAutoDiscoveryPeriod(-1);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPeriodIsNegative() {
consumerBuilderImpl.patternAutoDiscoveryPeriod(-1, TimeUnit.MINUTES);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testConsumerBuilderImplWhenBatchReceivePolicyIsNull() {
consumerBuilderImpl.batchReceivePolicy(null);
Expand All @@ -261,6 +266,7 @@ public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
consumerBuilderImpl.priorityLevel(1);
consumerBuilderImpl.maxTotalReceiverQueueSizeAcrossPartitions(1);
consumerBuilderImpl.patternAutoDiscoveryPeriod(1);
consumerBuilderImpl.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS);
}

}

0 comments on commit fed8c30

Please sign in to comment.