Skip to content

Commit

Permalink
[pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerial…
Browse files Browse the repository at this point in the history
…izer (apache#4549)

*Motivation*

KafkaProducerInterceptorWrapper uses a LongDeserializer for retrieve deserializer

*Modifications*

Fix the bug

*Verify this change*

Add unit test to cover the convertion
  • Loading branch information
sijie authored Jun 18, 2019
1 parent 571b684 commit 4c352f0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -232,10 +233,10 @@ private String getPartitionID(PulsarApi.MessageMetadata.Builder messageMetadataB
.getValue();
}

private Deserializer getDeserializer(Serializer serializer) {
static Deserializer getDeserializer(Serializer serializer) {
if (serializer instanceof StringSerializer) {
return new StringDeserializer();
} else if (serializer instanceof LongDeserializer) {
} else if (serializer instanceof LongSerializer) {
return new LongDeserializer();
} else if (serializer instanceof IntegerSerializer) {
return new IntegerDeserializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,29 @@
package org.apache.pulsar.client.kafka.compat;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.Arrays;
Expand All @@ -38,6 +52,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;

public class KafkaProducerInterceptorWrapperTest {

Expand All @@ -59,10 +74,10 @@ public void testProducerInterceptorConvertRecordCorrectly() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
Assert.assertEquals(record.key(), "original key");
Assert.assertEquals(record.value(), "original value".getBytes());
Assert.assertEquals(record.timestamp().longValue(), timeStamp);
Assert.assertEquals(record.partition().intValue(), partitionID);
assertEquals(record.key(), "original key");
assertEquals(record.value(), "original value".getBytes());
assertEquals(record.timestamp().longValue(), timeStamp);
assertEquals(record.partition().intValue(), partitionID);
return new ProducerRecord<String, byte[]>(topic, "processed key", "processed value".getBytes());
}
}).when(mockInterceptor1).onSend(any(ProducerRecord.class));
Expand All @@ -74,10 +89,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
Assert.assertEquals(record.key(), "processed key");
Assert.assertEquals(record.value(), "processed value".getBytes());
Assert.assertEquals(record.timestamp().longValue(), timeStamp);
Assert.assertEquals(record.partition().intValue(), partitionID);
assertEquals(record.key(), "processed key");
assertEquals(record.value(), "processed value".getBytes());
assertEquals(record.timestamp().longValue(), timeStamp);
assertEquals(record.partition().intValue(), partitionID);
return record;
}
}).when(mockInterceptor2).onSend(any(ProducerRecord.class));
Expand All @@ -99,4 +114,37 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class));
}

@DataProvider(name = "serializers")
public Object[][] serializers() {
return new Object[][] {
{
new StringSerializer(), StringDeserializer.class
},
{
new LongSerializer(), LongDeserializer.class
},
{
new IntegerSerializer(), IntegerDeserializer.class,
},
{
new DoubleSerializer(), DoubleDeserializer.class,
},
{
new BytesSerializer(), BytesDeserializer.class
},
{
new ByteBufferSerializer(), ByteBufferDeserializer.class
},
{
new ByteArraySerializer(), ByteArrayDeserializer.class
}
};
}

@Test(dataProvider = "serializers")
public void testGetDeserializer(Serializer serializer, Class deserializerClass) {
Deserializer deserializer = KafkaProducerInterceptorWrapper.getDeserializer(serializer);
assertEquals(deserializer.getClass(), deserializerClass);
}

}

0 comments on commit 4c352f0

Please sign in to comment.