Skip to content

Commit

Permalink
[pulsar-kafka] support block-producer on queue-full with sendTimeOut …
Browse files Browse the repository at this point in the history
…configuration (apache#6139)

### Motivation
Right now, pulsar-kafka producer block the publish when queue is full if `sendTimeOut > 0`. However, we have multiple users who want to configure `sendTimeOut` but doesn't want to block the thread and need immediate failure.
So, add option to configure `BLOCK_IF_PRODUCER_QUEUE_FULL` and it will not impact existing behavior because if the `BLOCK_IF_PRODUCER_QUEUE_FULL` doesn't exist then it will fallback to existing behavior.
  • Loading branch information
rdhabalia authored Feb 10, 2020
1 parent 4fd17d4 commit 5fe096d
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema,
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
Boolean sendTimeOutConfigured = sendTimeoutMillis > 0;
boolean shouldBlockPulsarProducer = Boolean.getBoolean(properties
.getProperty(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, sendTimeOutConfigured.toString()));
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);

interceptors = (List) producerConfig.getConfiguredInstances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;

public class PulsarProducerKafkaConfig {

Expand All @@ -33,6 +34,11 @@ public class PulsarProducerKafkaConfig {
public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
/**
* send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in
* pending queue.
**/
public static final String BLOCK_IF_PRODUCER_QUEUE_FULL = "pulsar.block.if.producer.queue.full";

public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,13 @@ public void testPulsarKafkaProducer() {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, Boolean.FALSE.toString());

new PulsarKafkaProducer<>(properties);

verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
}

@Test
Expand Down
1 change: 1 addition & 0 deletions site2/docs/adaptors-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ You can configure Pulsar authentication provider directly from the Kafka propert
| [`pulsar.producer.max.pending.messages.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-) | `50000` | Set the maximum number of pending messages across all the partitions. |
| [`pulsar.producer.batching.enabled`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-) | `true` | Control whether automatic batching of messages is enabled for the producer. |
| [`pulsar.producer.batching.max.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-) | `1000` | The maximum number of messages in a batch. |
| [`pulsar.block.if.producer.queue.full`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBlockIfQueueFull-boolean-) | | Specify the block producer if queue is full. |


### Pulsar consumer Properties
Expand Down

0 comments on commit 5fe096d

Please sign in to comment.