From 4c352f0371445a1e696f7a096357675f701d6cba Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 18 Jun 2019 02:36:44 -0700 Subject: [PATCH] [pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerializer (#4549) *Motivation* KafkaProducerInterceptorWrapper uses a LongDeserializer for retrieve deserializer *Modifications* Fix the bug *Verify this change* Add unit test to cover the convertion --- .../KafkaProducerInterceptorWrapper.java | 5 +- .../KafkaProducerInterceptorWrapperTest.java | 66 ++++++++++++++++--- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java index 99195cec99991..5cedef90582c3 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -232,10 +233,10 @@ private String getPartitionID(PulsarApi.MessageMetadata.Builder messageMetadataB .getValue(); } - private Deserializer getDeserializer(Serializer serializer) { + static Deserializer getDeserializer(Serializer serializer) { if (serializer instanceof StringSerializer) { return new StringDeserializer(); - } else if (serializer instanceof LongDeserializer) { + } else if (serializer instanceof LongSerializer) { return new LongDeserializer(); } else if (serializer instanceof IntegerSerializer) { return new IntegerDeserializer(); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java index 8da6339e8a5bc..aadfce8907362 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java @@ -19,7 +19,21 @@ package org.apache.pulsar.client.kafka.compat; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.ProducerInterceptor; import org.apache.pulsar.client.impl.ProducerInterceptors; @@ -27,7 +41,7 @@ import org.apache.pulsar.client.impl.schema.BytesSchema; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Arrays; @@ -38,6 +52,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; public class KafkaProducerInterceptorWrapperTest { @@ -59,10 +74,10 @@ public void testProducerInterceptorConvertRecordCorrectly() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; - Assert.assertEquals(record.key(), "original key"); - Assert.assertEquals(record.value(), "original value".getBytes()); - Assert.assertEquals(record.timestamp().longValue(), timeStamp); - Assert.assertEquals(record.partition().intValue(), partitionID); + assertEquals(record.key(), "original key"); + assertEquals(record.value(), "original value".getBytes()); + assertEquals(record.timestamp().longValue(), timeStamp); + assertEquals(record.partition().intValue(), partitionID); return new ProducerRecord(topic, "processed key", "processed value".getBytes()); } }).when(mockInterceptor1).onSend(any(ProducerRecord.class)); @@ -74,10 +89,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; - Assert.assertEquals(record.key(), "processed key"); - Assert.assertEquals(record.value(), "processed value".getBytes()); - Assert.assertEquals(record.timestamp().longValue(), timeStamp); - Assert.assertEquals(record.partition().intValue(), partitionID); + assertEquals(record.key(), "processed key"); + assertEquals(record.value(), "processed value".getBytes()); + assertEquals(record.timestamp().longValue(), timeStamp); + assertEquals(record.partition().intValue(), partitionID); return record; } }).when(mockInterceptor2).onSend(any(ProducerRecord.class)); @@ -99,4 +114,37 @@ public Object answer(InvocationOnMock invocation) throws Throwable { verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class)); } + @DataProvider(name = "serializers") + public Object[][] serializers() { + return new Object[][] { + { + new StringSerializer(), StringDeserializer.class + }, + { + new LongSerializer(), LongDeserializer.class + }, + { + new IntegerSerializer(), IntegerDeserializer.class, + }, + { + new DoubleSerializer(), DoubleDeserializer.class, + }, + { + new BytesSerializer(), BytesDeserializer.class + }, + { + new ByteBufferSerializer(), ByteBufferDeserializer.class + }, + { + new ByteArraySerializer(), ByteArrayDeserializer.class + } + }; + } + + @Test(dataProvider = "serializers") + public void testGetDeserializer(Serializer serializer, Class deserializerClass) { + Deserializer deserializer = KafkaProducerInterceptorWrapper.getDeserializer(serializer); + assertEquals(deserializer.getClass(), deserializerClass); + } + }