Skip to content

Commit

Permalink
[Issue 11846][Java client] Create a consumer in the paused state (apa…
Browse files Browse the repository at this point in the history
…che#11974)

* Set paused() in ConsumerBuilder()

* Test that consumer is paused

* Remove todo

* Use present tense in paused Javadoc

Co-authored-by: Anonymitaet <[email protected]>

* Update method name and Javadoc

* ConsumerBuilderImpl test

Co-authored-by: Anonymitaet <[email protected]>
  • Loading branch information
zwalsh-toast and Anonymitaet authored Dec 10, 2021
1 parent 0ce155e commit f0da648
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* </pre>
*/
ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff);

/**
* Start the consumer in a paused state. When enabled, the consumer does not immediately fetch messages when
* {@link #subscribe()} is called. Instead, the consumer waits to fetch messages until {@link Consumer#resume()} is
* called.
* <p/>
* See also {@link Consumer#pause()}.
* @default false
*/
ConsumerBuilder<T> startPaused(boolean paused);
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,10 @@ public ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBack
conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
return this;
}

@Override
public ConsumerBuilder<T> startPaused(boolean paused) {
conf.setStartPaused(paused);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
this.paused = conf.isStartPaused();

if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId)) : null;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
this.paused = conf.isStartPaused();

if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public int getMaxPendingChuckedMessage() {
@JsonIgnore
private transient MessagePayloadProcessor payloadProcessor = null;

private boolean startPaused = false;

public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import java.util.regex.Pattern;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

/**
* Unit tests of {@link ConsumerBuilderImpl}.
Expand Down Expand Up @@ -310,4 +312,10 @@ public void testNegativeAckRedeliveryBackoff() {
.maxNackTimeMs(10 * 1000)
.build());
}

@Test
public void testStartPaused() {
consumerBuilderImpl.startPaused(true);
verify(consumerBuilderImpl.getConf()).setStartPaused(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -200,4 +201,21 @@ public void testClose() {
}
Assert.assertNull(checkException);
}

@Test
public void testConsumerCreatedWhilePaused() throws InterruptedException {
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
String topic = "non-persistent://tenant/ns1/my-topic";

consumerConf.setStartPaused(true);

consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
executorProvider, -1, false, new CompletableFuture<>(), null, null, null,
true);

Assert.assertTrue(consumer.paused);
}
}

0 comments on commit f0da648

Please sign in to comment.