Skip to content

Commit

Permalink
Add support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG (
Browse files Browse the repository at this point in the history
…apache#1090) (apache#3753)

Add support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in PulsarKafkaProducer by utilizing pulsar's ClientBuilder.keepAliveInterval
  • Loading branch information
MarvinCai authored and sijie committed Mar 9, 2019
1 parent 85afd6e commit 18e3ab1
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarKafkaProducer<K, V> implements Producer<K, V> {

Expand Down Expand Up @@ -103,7 +105,7 @@ private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Ser
if (valueSerializer == null) {
this.valueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(producerConfig.originals(), true);
this.valueSerializer.configure(producerConfig.originals(), false);
} else {
this.valueSerializer = valueSerializer;
producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
Expand All @@ -112,9 +114,18 @@ private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Ser
partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
partitioner.configure(producerConfig.originals());

long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));

String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
// Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
// If passed in value is greater than Integer.MAX_VALUE in second will throw ArithmeticException.
int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
} catch (ArithmeticException e) {
String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -169,7 +180,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
}

TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
int messageSize = buildMessage(messageBuilder, record);;
int messageSize = buildMessage(messageBuilder, record);

CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
messageBuilder.sendAsync().thenAccept((messageId) -> {
Expand Down Expand Up @@ -263,7 +274,7 @@ private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K,
builder.value(value);

if (record.partition() != null) {
// Partition was explicitely set on the record
// Partition was explicitly set on the record
builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
} else {
// Get the partition id from the partitioner
Expand Down Expand Up @@ -296,4 +307,6 @@ private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
}

private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class PulsarKafkaProducerTest {

@ObjectFactory
// Necessary to make PowerMockito.mockStatic work with TestNG.
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}

@Test
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
return mockClientBuilder;
}
}).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));

PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);

Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");

PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);

verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));

PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
}
}
2 changes: 1 addition & 1 deletion site2/docs/adaptors-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Properties:
| `buffer.memory` | Ignored | |
| `client.id` | Ignored | |
| `compression.type` | Yes | Allows `gzip` and `lz4`. No `snappy`. |
| `connections.max.idle.ms` | Ignored | |
| `connections.max.idle.ms` | Yes | Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time|
| `interceptor.classes` | Ignored | |
| `key.serializer` | Yes | |
| `linger.ms` | Yes | Controls the group commit time when batching messages |
Expand Down

0 comments on commit 18e3ab1

Please sign in to comment.