diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 5156b50300452..bbfe5cde266a7 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -93,6 +93,7 @@ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clien this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); this.consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType()); + this.consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize()); this.pulsarSpoutConf = pulsarSpoutConf; this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java index bb8d8c88bf305..0f27ddc589990 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -46,6 +46,7 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration { private SubscriptionType subscriptionType = SubscriptionType.Shared; private boolean autoUnsubscribe = false; + private int consumerReceiverQueueSize = 1000; /** * @return the subscription name for the consumer in the spout @@ -71,6 +72,19 @@ public void setSubscriptionType(SubscriptionType subscriptionType) { this.subscriptionType = subscriptionType; } + public int getConsumerReceiverQueueSize() { + return consumerReceiverQueueSize; + } + + /** + * Receiver queue size of pulsar-spout consumer. + * + * @param consumerReceiverQueueSize + */ + public void setConsumerReceiverQueueSize(int consumerReceiverQueueSize) { + this.consumerReceiverQueueSize = consumerReceiverQueueSize; + } + /** * @return the mapper to convert pulsar message to a storm tuple */