Skip to content

Commit

Permalink
Consumer subscribe with AutoConsumeSchema will throw npe (apache#4960)
Browse files Browse the repository at this point in the history
### Motivation
To fix apache#4838
  • Loading branch information
congbobo184 authored and sijie committed Aug 25, 2019
1 parent dd5b4d0 commit a3609ef
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public byte[] getSchemaVersion() {

@Override
public T getValue() {
if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -742,10 +743,12 @@ protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientIm
if (schema.requireFetchingSchemaInfo()) {
return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
if (null == schemaInfo) {
// no schema info is found
return FutureUtil.failedFuture(
new PulsarClientException.NotFoundException(
"No latest schema found for topic " + topicName));
if (!(schema instanceof AutoConsumeSchema)) {
// no schema info is found
return FutureUtil.failedFuture(
new PulsarClientException.NotFoundException(
"No latest schema found for topic " + topicName));
}
}
try {
log.info("Configuring schema for topic {} : {}", topicName, schemaInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
Expand All @@ -30,6 +31,8 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.util.concurrent.ExecutionException;

/**
* Auto detect schema.
*/
Expand All @@ -38,6 +41,12 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {

private Schema<GenericRecord> schema;

private String topicName;

private String componentName;

private SchemaInfoProvider schemaInfoProvider;

public void setSchema(Schema<GenericRecord> schema) {
this.schema = schema;
}
Expand Down Expand Up @@ -67,20 +76,40 @@ public byte[] encode(GenericRecord message) {

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
if (schema == null) {
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoProvider.getLatestSchema().get();
} catch (InterruptedException | ExecutionException e ) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("Con't get last schema for topic {} use AutoConsumeSchema", topicName);
throw new SchemaSerializationException(e.getCause());
}
schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
ensureSchemaInitialized();

return schema.decode(bytes, schemaVersion);
}

@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
schema.setSchemaInfoProvider(schemaInfoProvider);
if (schema == null) {
this.schemaInfoProvider = schemaInfoProvider;
} else {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
}

@Override
public SchemaInfo getSchemaInfo() {
ensureSchemaInitialized();

if (schema == null) {
return null;
}
return schema.getSchemaInfo();
}

Expand All @@ -93,16 +122,24 @@ public boolean requireFetchingSchemaInfo() {
public void configureSchemaInfo(String topicName,
String componentName,
SchemaInfo schemaInfo) {
this.topicName = topicName;
this.componentName = componentName;
if (schemaInfo != null) {
GenericSchema genericSchema = generateSchema(schemaInfo);
setSchema(genericSchema);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
}

private GenericSchema generateSchema(SchemaInfo schemaInfo) {
if (schemaInfo.getType() != SchemaType.AVRO
&& schemaInfo.getType() != SchemaType.JSON) {
throw new RuntimeException("Currently auto consume only works for topics with avro or json schemas");
}
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
setSchema(genericSchema);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
return GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
}

public static Schema<?> getSchema(SchemaInfo schemaInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -131,37 +132,36 @@ public void testMultiVersionSchema() throws Exception {
Sets.newHashSet(pulsarCluster.getClusterName())
);

// Create a topic with `Person`
try (Producer<Person> producer = client.newProducer(Schema.AVRO(
Producer<Person> producer = client.newProducer(Schema.AVRO(
SchemaDefinition.<Person>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Person.class).build()))
.topic(fqtn)
.create()
) {
Person person = new Person();
person.setName("Tom Hanks");
person.setAge(60);

producer.send(person);
.create();

log.info("Successfully published person : {}", person);
}
Person person = new Person();
person.setName("Tom Hanks");
person.setAge(60);

//Create a consumer for MultiVersionSchema
try (Consumer<PersonConsumeSchema> consumer = client.newConsumer(Schema.AVRO(
Consumer<PersonConsumeSchema> consumer = client.newConsumer(Schema.AVRO(
SchemaDefinition.<PersonConsumeSchema>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(PersonConsumeSchema.class).build()))
.subscriptionName("test")
.topic(fqtn)
.subscribe();
) {
PersonConsumeSchema personConsumeSchema = consumer.receive().getValue();
assertEquals("Tom Hanks", personConsumeSchema.getName());
assertEquals(60, personConsumeSchema.getAge());
assertEquals("man", personConsumeSchema.getGender());
log.info("Successfully consumer personConsumeSchema : {}", personConsumeSchema);
}

producer.send(person);
log.info("Successfully published person : {}", person);

PersonConsumeSchema personConsumeSchema = consumer.receive().getValue();
assertEquals("Tom Hanks", personConsumeSchema.getName());
assertEquals(60, personConsumeSchema.getAge());
assertEquals("male", personConsumeSchema.getGender());

producer.close();
consumer.close();
log.info("Successfully consumer personConsumeSchema : {}", personConsumeSchema);
}

@Test
Expand Down Expand Up @@ -190,32 +190,75 @@ public void testAvroLogicalType() throws Exception {
.date(LocalDate.now())
.build();

try (Producer<AvroLogicalType> producer = client
Producer<AvroLogicalType> producer = client
.newProducer(Schema.AVRO(AvroLogicalType.class))
.topic(fqtn)
.create()
) {
producer.send(messageForSend);
log.info("Successfully published avro logical type message : {}", messageForSend);
}
.create();

try (Consumer<AvroLogicalType> consumer = client
Consumer<AvroLogicalType> consumer = client
.newConsumer(Schema.AVRO(AvroLogicalType.class))
.topic(fqtn)
.subscribe()
) {
AvroLogicalType received = consumer.receive().getValue();
assertEquals(messageForSend.getDecimal(), received.getDecimal());
assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros());
assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis());
assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros());
assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis());
assertEquals(messageForSend.getDate(), received.getDate());

log.info("Successfully consumer avro logical type message : {}", received);
}
.subscriptionName("test")
.subscribe();

producer.send(messageForSend);
log.info("Successfully published avro logical type message : {}", messageForSend);

AvroLogicalType received = consumer.receive().getValue();
assertEquals(messageForSend.getDecimal(), received.getDecimal());
assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros());
assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis());
assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros());
assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis());
assertEquals(messageForSend.getDate(), received.getDate());

producer.close();
consumer.close();

log.info("Successfully consumer avro logical type message : {}", received);
}

@Test
public void testAutoConsumeSchemaSubscribeFirst() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topic = "test-auto-consume-schema";
final String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(pulsarCluster.getClusterName())
);

Consumer<GenericRecord> consumer = client
.newConsumer(Schema.AUTO_CONSUME())
.topic(fqtn)
.subscriptionName("test")
.subscribe();

Producer<Person> producer = client
.newProducer(Schema.AVRO(Person.class))
.topic(fqtn)
.create();

Person person = new Person();
person.setName("Tom Hanks");
person.setAge(60);
producer.send(person);

GenericRecord genericRecord = consumer.receive().getValue();

assertEquals(genericRecord.getField("name"), "Tom Hanks");
assertEquals(genericRecord.getField("age"), 60);

consumer.close();
producer.close();
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
*/
package org.apache.pulsar.tests.integration.schema;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.avro.reflect.AvroDefault;
Expand Down Expand Up @@ -99,7 +101,11 @@ public static class Student {
}

@Data
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Builder
public static class AvroLogicalType{
Expand Down

0 comments on commit a3609ef

Please sign in to comment.