diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 85b8080bd865f..7271c870a1347 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -25,6 +25,9 @@ import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -321,6 +324,16 @@ public CompletableFuture publish(String topicName, O object, Schema try { Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) .schema(schema) + .blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .compressionType(CompressionType.LZ4) + .hashingScheme(HashingScheme.Murmur3_32Hash) // + .messageRoutingMode(MessageRoutingMode.CustomPartition) + .messageRouter(FunctionResultRouter.of()) + // set send timeout to be infinity to prevent potential deadlock with consumer + // that might happen when consumer is blocked due to unacked messages + .sendTimeout(0, TimeUnit.SECONDS) .topic(topicName) .properties(InstanceUtils.getProperties(componentType, FunctionDetailsUtils.getFullyQualifiedName( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 3195a82b742ea..717ac1051f547 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -85,7 +85,7 @@ public Producer createProducer(PulsarClient client, String topic, String ProducerBuilder builder = client.newProducer(schema) .blockIfQueueFull(true) .enableBatching(true) - .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .compressionType(CompressionType.LZ4) .hashingScheme(HashingScheme.Murmur3_32Hash) // .messageRoutingMode(MessageRoutingMode.CustomPartition)