Skip to content

Commit

Permalink
[ISSUE 8832] Avro custom schema not working in consumer (apache#8939)
Browse files Browse the repository at this point in the history
Fix apache#8832 

### Modifications
Make AvroSchema's clone method consider custom reader and writer.
  • Loading branch information
reswqa authored Dec 14, 2020
1 parent a9e1ac3 commit 3da2365
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -858,8 +859,42 @@ public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback
consumer.close();
}


@Test
public void testAvroSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
TestMessageObject object = new TestMessageObject();
SchemaReader<TestMessageObject> reader = Mockito.mock(SchemaReader.class);
SchemaWriter<TestMessageObject> writer = Mockito.mock(SchemaWriter.class);
Mockito.when(reader.read(Mockito.any(byte[].class), Mockito.any(byte[].class))).thenReturn(object);
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake data".getBytes(StandardCharsets.UTF_8));
SchemaDefinition<TestMessageObject> schemaDefinition = new SchemaDefinitionBuilderImpl<TestMessageObject>()
.withPojo(TestMessageObject.class)
.withSchemaReader(reader)
.withSchemaWriter(writer)
.build();
Schema<TestMessageObject> schema = Schema.AVRO(schemaDefinition);
PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.build();

try(Producer<TestMessageObject> producer = client.newProducer(schema).topic(topicName).create();
Consumer<TestMessageObject> consumer =
client.newConsumer(schema).topic(topicName).subscriptionName("my-subscriber-name").subscribe()) {
assertNotNull(producer);
assertNotNull(consumer);
producer.newMessage().value(object).send();
TestMessageObject testObject = consumer.receive().getValue();
Assert.assertEquals(object.getValue(), testObject.getValue());
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
Mockito.verify(reader, Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
} finally {
client.close();
}
}

@Test
public void testProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
ObjectMapper mapper = new ObjectMapper();
SchemaReader<TestMessageObject> reader = Mockito.spy(new JacksonJsonReader<>(mapper, TestMessageObject.class));
Expand Down Expand Up @@ -889,6 +924,8 @@ public void testProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClie

Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
Mockito.verify(reader, Mockito.times(1)).read(Mockito.any(byte[].class));
} finally {
client.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
@Slf4j
public class AvroSchema<T> extends AvroBaseStructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

private boolean isCustomReaderAndWriter;
private ClassLoader pojoClassLoader;

private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
Expand All @@ -61,6 +61,7 @@ private AvroSchema(SchemaReader<T> reader, SchemaWriter<T> writer, SchemaInfo sc
super(schemaInfo);
setReader(reader);
setWriter(writer);
isCustomReaderAndWriter = true;
}

@Override
Expand All @@ -70,6 +71,9 @@ public boolean supportSchemaVersioning() {

@Override
public Schema<T> clone() {
if (isCustomReaderAndWriter) {
return new AvroSchema<>(reader, writer, schemaInfo);
}
Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);
Expand Down

0 comments on commit 3da2365

Please sign in to comment.