Skip to content

Commit

Permalink
Added ability to specify consumer queue size for function input topics (
Browse files Browse the repository at this point in the history
apache#3608)

* Added ability to specify consumer queue size for function input topics

* fix and update PR
  • Loading branch information
srkukarni authored and merlimat committed Mar 29, 2019
1 parent 3418de9 commit e077374
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ public class ConsumerConfig {
private String schemaType;
private String serdeClassName;
private boolean isRegexPattern;
private Integer receiverQueueSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
} else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
consumerConfig.setSerdeClassName(conf.getSerdeClassName());
}
if (conf.hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
}
pulsarSourceConfig.getTopicSchema().put(topic, consumerConfig);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
} else {
cb.topic(topic);
}
if (conf.getReceiverQueueSize() != null) {
cb.receiverQueueSize(conf.getReceiverQueueSize());
}
cb.properties(properties);

if (pulsarSourceConfig.getTimeoutMs() != null) {
Expand Down Expand Up @@ -159,7 +162,7 @@ Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundExcept
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);
}
configs.put(topic,
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).build());
});

return configs;
Expand All @@ -174,6 +177,7 @@ public List<String> getInputTopics() {
private static class ConsumerConfig<T> {
private Schema<T> schema;
private boolean isRegexPattern;
private Integer receiverQueueSize;
}

}
180 changes: 155 additions & 25 deletions pulsar-functions/instance/src/main/python/Function_pb2.py

Large diffs are not rendered by default.

25 changes: 16 additions & 9 deletions pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(self,
state_storage_serviceurl):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = queue.Queue(max_buffered_tuples)
# set queue size to one since consumers already have internal queues. Just use queue to communicate message from
# consumers to processing thread
self.queue = queue.Queue(1)
self.log_topic_handler = None
if function_details.logTopic is not None and function_details.logTopic != "":
self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
Expand Down Expand Up @@ -151,6 +153,7 @@ def run(self):
serde_kclass = util.import_class(os.path.dirname(self.user_code), serde)
self.input_serdes[topic] = serde_kclass()
Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))

self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
Expand All @@ -166,21 +169,25 @@ def run(self):
serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName)
self.input_serdes[topic] = serde_kclass()
Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))

consumer_args = {
"consumer_type": mode,
"message_listener": partial(self.message_listener, self.input_serdes[topic]),
"unacked_messages_timeout_ms": int(self.timeout_ms) if self.timeout_ms else None,
"properties": properties
}
if consumer_conf.HasField("receiverQueueSize"):
consumer_args["receiver_queue_size"] = consumer_conf.receiverQueueSize.value

if consumer_conf.isRegexPattern:
self.consumers[topic] = self.pulsar_client.subscribe(
re.compile(str(topic)), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
properties=properties
**consumer_args
)
else:
self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
properties=properties
**consumer_args
)

function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
Expand Down
4 changes: 4 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ message ConsumerSpec {
string schemaType = 1;
string serdeClassName = 2;
bool isRegexPattern = 3;
message ReceiverQueueSize {
int32 value = 1;
}
ReceiverQueueSize receiverQueueSize = 4;
}

message SourceSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
} else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) {
bldr.setSerdeClassName(consumerConf.getSerdeClassName());
}
if (consumerConf.getReceiverQueueSize() != null) {
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(consumerConf.getReceiverQueueSize()).build());
}
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}
Expand Down Expand Up @@ -240,6 +244,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (!isEmpty(input.getValue().getSchemaType())) {
consumerConfig.setSchemaType(input.getValue().getSchemaType());
}
if (input.getValue().hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
}
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
Expand Down Expand Up @@ -521,6 +528,16 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
throw new IllegalArgumentException("The supplied python file does not exist");
}
}

if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, conf) -> {
// receiver queue size should be >= 0
if (conf.getReceiverQueueSize() != null && conf.getReceiverQueueSize() < 0) {
throw new IllegalArgumentException(
String.format("Receiver queue size should be >= zero"));
}
});
}
}

private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
Expand Down

0 comments on commit e077374

Please sign in to comment.