Skip to content

Commit

Permalink
set functions config to me uniform (apache#3629)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and merlimat committed Feb 20, 2019
1 parent ce2d21c commit 2fd283e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -321,6 +324,16 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O>
try {
Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
.schema(schema)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
// set send timeout to be infinity to prevent potential deadlock with consumer
// that might happen when consumer is blocked due to unacked messages
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.properties(InstanceUtils.getProperties(componentType,
FunctionDetailsUtils.getFullyQualifiedName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public <T> Producer<T> createProducer(PulsarClient client, String topic, String
ProducerBuilder<T> builder = client.newProducer(schema)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
Expand Down

0 comments on commit 2fd283e

Please sign in to comment.