Skip to content

Commit

Permalink
Use default queue size in function producer (apache#3619)
Browse files Browse the repository at this point in the history
* Use default queue size in function producer

* Fixed queue size
  • Loading branch information
merlimat authored and jerrypeng committed Feb 20, 2019
1 parent 84c68c3 commit 2e1e1ab
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
5 changes: 2 additions & 3 deletions pulsar-functions/instance/src/main/python/contextimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def get_user_config_value(self, key):
return self.user_config[key]
else:
return None

def get_user_config_map(self):
return self.user_config

Expand Down Expand Up @@ -146,8 +146,7 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p
topic_name,
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=1,
max_pending_messages=100000,
batching_max_publish_delay_ms=10,
compression_type=pulsar_compression_type,
properties=util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
Expand Down
4 changes: 2 additions & 2 deletions pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ def setup_producer(self):
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=1,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.LZ4,
# set send timeout to be infinity to prevent potential deadlock with consumer
# that might happen when consumer is blocked due to unacked messages
send_timeout_millis=0,
max_pending_messages=100000,
properties=util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
self.instance_config.function_details.namespace,
Expand Down

0 comments on commit 2e1e1ab

Please sign in to comment.