Skip to content

Commit

Permalink
Allow to configure most client/producer/consumer options in Kafka API…
Browse files Browse the repository at this point in the history
… wrapper (apache#1207)
  • Loading branch information
merlimat authored Feb 10, 2018
1 parent 8d3ab43 commit 16a554b
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
Expand All @@ -80,6 +81,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene

private volatile boolean closed = false;

private final Properties properties;

private static class QueueItem {
final org.apache.pulsar.client.api.Consumer consumer;
final Message message;
Expand Down Expand Up @@ -141,9 +144,9 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ

String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);

Properties properties = new Properties();
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties);
ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientConf.setUseTcpNoDelay(false);
Expand Down Expand Up @@ -201,7 +204,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

ConsumerConfiguration conf = new ConsumerConfiguration();
ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
conf.setSubscriptionType(SubscriptionType.Failover);
conf.setMessageListener(this);
if (numberOfPartitions > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;

public class PulsarKafkaProducer<K, V> implements Producer<K, V> {

Expand Down Expand Up @@ -106,15 +107,14 @@ private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Ser
}

String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties);
ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
try {
client = PulsarClient.create(serviceUrl, clientConf);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}

pulsarProducerConf = new ProducerConfiguration();
pulsarProducerConf.setBatchingEnabled(true);
pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties);

// To mimic the same batching mode as Kafka, we need to wait a very little amount of
// time to batch if the client is trying to send messages fast enough
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,30 @@
package org.apache.pulsar.client.kafka.compat;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;

public class PulsarKafkaConfig {
public class PulsarClientKafkaConfig {

/// Config variables
public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
public static final String USE_TLS = "pulsar.use.tls";
public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";

public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
public static final String NUM_IO_THREADS = "pulsar.num.io.threads";

public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";

public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";

public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";

public static ClientConfiguration getClientConfiguration(Properties properties) {
ClientConfiguration conf = new ClientConfiguration();

Expand All @@ -52,6 +64,36 @@ public static ClientConfiguration getClientConfiguration(Properties properties)
conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
}

if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
TimeUnit.MILLISECONDS);
}

if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), TimeUnit.SECONDS);
}

if (properties.containsKey(NUM_IO_THREADS)) {
conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
}

if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
}

if (properties.containsKey(USE_TCP_NODELAY)) {
conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
}

if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
}

if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
conf.setMaxNumberOfRejectedRequestPerConnection(
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
}

return conf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat;

import java.util.Properties;

import org.apache.pulsar.client.api.ConsumerConfiguration;

public class PulsarConsumerKafkaConfig {

/// Config variables
public static final String CONSUMER_NAME = "pulsar.consumer.name";
public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";

public static ConsumerConfiguration getConsumerConfiguration(Properties properties) {
ConsumerConfiguration conf = new ConsumerConfiguration();

if (properties.containsKey(CONSUMER_NAME)) {
conf.setConsumerName(properties.getProperty(CONSUMER_NAME));
}

if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
}

if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(
Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
}

return conf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat;

import java.util.Properties;

import org.apache.pulsar.client.api.ProducerConfiguration;

public class PulsarProducerKafkaConfig {

/// Config variables
public static final String PRODUCER_NAME = "pulsar.producer.name";
public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id";

public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages";
public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";

public static ProducerConfiguration getProducerConfiguration(Properties properties) {
ProducerConfiguration conf = new ProducerConfiguration();

if (properties.containsKey(PRODUCER_NAME)) {
conf.setProducerName(properties.getProperty(PRODUCER_NAME));
}

if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
}

if (properties.containsKey(MAX_PENDING_MESSAGES)) {
conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
}

if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
conf.setMaxPendingMessagesAcrossPartitions(
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
}

conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));

if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}

return conf;
}
}
42 changes: 35 additions & 7 deletions site/docs/latest/adaptors/KafkaWrapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ APIs:
| `void commitAsync()` | Yes | |
| `void commitAsync(OffsetCommitCallback callback)` | Yes | |
| `void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)` | Yes | |
| `void seek(TopicPartition partition, long offset)` | Yes | |
| `void seekToBeginning(Collection<TopicPartition> partitions)` | Yes | |
| `void seekToEnd(Collection<TopicPartition> partitions)` | Yes | |
| `void seek(TopicPartition partition, long offset)` | Yes | |
| `void seekToBeginning(Collection<TopicPartition> partitions)` | Yes | |
| `void seekToEnd(Collection<TopicPartition> partitions)` | Yes | |
| `long position(TopicPartition partition)` | Yes | |
| `OffsetAndMetadata committed(TopicPartition partition)` | Yes | |
| `Map<MetricName, ? extends Metric> metrics()` | No | |
Expand Down Expand Up @@ -229,11 +229,39 @@ Properties:

You can configure Pulsar authentication provider directly from the Kafka properties.

Properties:
### Pulsar client properties:

| Config property | Default | Notes |
|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
| `pulsar.authentication.class` | | Configure to auth provider. Eg. `org.apache.pulsar.client.impl.auth.AuthenticationTls` |
| `pulsar.use.tls` | `false` | Enable TLS transport encryption |
| `pulsar.tls.trust.certs.file.path` | | Path for the TLS trust certificate store |
| `pulsar.tls.allow.insecure.connection` | `false` | Accept self-signed certificates from brokers |
| [`pulsar.use.tls`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-) | `false` | Enable TLS transport encryption |
| [`pulsar.tls.trust.certs.file.path`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-) | | Path for the TLS trust certificate store |
| [`pulsar.tls.allow.insecure.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-) | `false` | Accept self-signed certificates from brokers |
| [`pulsar.operation.timeout.ms`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-) | `30000` | General operations timeout |
| [`pulsar.stats.interval.seconds`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-) | `60` | Pulsar client lib stats printing interval |
| [`pulsar.num.io.threads`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-) | `1` | Number of Netty IO threads to use |
| [`pulsar.connections.per.broker`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-) | `1` | Max number of connection to open to each broker |
| [`pulsar.use.tcp.nodelay`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-) | `true` | TCP no-delay |
| [`pulsar.concurrent.lookup.requests`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-) | `50000` | Max number of concurrent topic lookups |
| [`pulsar.max.number.rejected.request.per.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-) | `50` | Threshold of errors to forcefully close a connection |


### Pulsar producer properties

| Config property | Default | Notes |
|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
| [`pulsar.producer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-) | | Specify producer name |
| [`pulsar.producer.initial.sequence.id`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-) | | Specify baseline for sequence id for this producer |
| [`pulsar.producer.max.pending.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-) | `1000` | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. |
| [`pulsar.producer.max.pending.messages.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-) | `50000` | Set the number of max pending messages across all the partitions |
| [`pulsar.producer.batching.enabled`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-) | `true` | Control whether automatic batching of messages is enabled for the producer |
| [`pulsar.producer.batching.max.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-) | `1000` | The maximum number of messages permitted in a batch |


### Pulsar consumer Properties

| Config property | Default | Notes |
|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
| [`pulsar.consumer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-) | | Set the consumer name |
| [`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-) | 1000 | Sets the size of the consumer receive queue |
| [`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-) | 50000 | Set the max total receiver queue size across partitons |

0 comments on commit 16a554b

Please sign in to comment.