Skip to content

Commit

Permalink
Fixed avro schema decode error in functions (apache#6662)
Browse files Browse the repository at this point in the history
Fixes apache#5503 
From apache#6445 

# Motivation
In functions, it will encounter ```ClassCastException``` when using the Avro schema for topics.

```
Exception in thread "main" java.lang.ClassCastException: org.apache.pulsar.shade.org.apache.avro.generic.GenericData$Record cannot be cast to io.streamnative.KeyValueSchemaTest$Foo2
	at io.streamnative.KeyValueSchemaTest.testConsumerByPythonProduce(KeyValueSchemaTest.java:412)
	at io.streamnative.KeyValueSchemaTest.main(KeyValueSchemaTest.java:305)
```

# Modifications
In functions, when using Avro schema specific the ClassLoader for ReflectDatumReader.

Add integration test ```testAvroSchemaFunction``` in class ```PulsarFunctionsTest```.
  • Loading branch information
gaoran10 authored Apr 5, 2020
1 parent 98344a7 commit 52ae182
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 19 deletions.
32 changes: 31 additions & 1 deletion .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
sudo apt clean
docker rmi $(docker images -q) -f
df -h
free -h
- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
Expand Down Expand Up @@ -96,13 +97,42 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker

- name: run unit tests pulsar broker persistent dispatcher failover consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker v1 admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker compaction test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker

- name: run unit tests pulsar broker batch message test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker

- name: run unit tests pulsar broker partitioned topics schema test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker

- name: run unit test pulsar-broker
if: steps.docs.outputs.changed_only == 'no'
run: mvn test '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegrationTest,!ReplicatorRateLimiterTest' -DfailIfNoTests=false -pl pulsar-broker
run: |
df -h
free -h
mvn test -e '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegrationTest,!ReplicatorRateLimiterTest,!PersistentDispatcherFailoverConsumerTest,!AdminApiTest,!V1_AdminApiTest,!CompactionTest,!BatchMessageTest,!PartitionedTopicsSchemaTest' -DfailIfNoTests=false -pl pulsar-broker
- name: package surefire artifacts
if: failure()
run: |
df -h
free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,9 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
if (log.isDebugEnabled()) {
log.debug("Received CommandGetSchema call from {}", remoteAddress);
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
remoteAddress, new String(commandGetSchema.getSchemaVersion().toByteArray()),
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
}

long requestId = commandGetSchema.getRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ public class AvroSchema<T> extends StructSchema<T> {
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private AvroSchema(SchemaInfo schemaInfo) {
private ClassLoader pojoClassLoader;

private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
setReader(new AvroReader<>(schema));
this.pojoClassLoader = pojoClassLoader;
setReader(new AvroReader<>(schema, pojoClassLoader));
setWriter(new AvroWriter<>(schema));
}

Expand All @@ -78,34 +81,42 @@ public boolean supportSchemaVersioning() {

@Override
public Schema<T> clone() {
Schema<T> schema = new AvroSchema<>(schemaInfo);
Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
return schema;
}

public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
ClassLoader pojoClassLoader = null;
if (schemaDefinition.getPojo() != null) {
pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
}
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}

public static <T> AvroSchema<T> of(Class<T> pojo) {
return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
}

public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
ClassLoader pojoClassLoader = null;
if (pojo != null) {
pojoClassLoader = pojo.getClassLoader();
}
SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}

@Override
protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
schemaInfo.getSchemaDefinition(), schemaInfo.toString());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pulsar.client.impl.schema.reader;

import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;
Expand All @@ -41,8 +44,34 @@ public AvroReader(Schema schema) {
this.reader = new ReflectDatumReader<>(schema);
}

public AvroReader(Schema writerSchema, Schema readerSchema) {
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
public AvroReader(Schema schema, ClassLoader classLoader) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(schema);
}
}

public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ private ClassLoader loadJars() throws Exception {
Collections.emptyList());
}

log.info("Initialize function class loader for function {} at function cache manager",
instanceConfig.getFunctionDetails().getName());
log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));

fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema());
log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}",
topic, conf.getSchema(), conf.getSchema().getSchemaInfo());

ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
// consume message even if can't decrypt and deliver it along with encryption-ctx
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;


@Slf4j
public class AvroSchemaTestFunction implements Function<AvroTestObject, AvroTestObject> {

@Override
public AvroTestObject process(AvroTestObject input, Context context) throws Exception {
log.info("AvroTestObject - baseValue: {}", input.getBaseValue());
input.setBaseValue(input.getBaseValue() + 10);
return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collection;


/**
* This functions collects the timestamp during the window operation.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples.pojo;

import lombok.Data;


/**
* Avro test object.
*/
@Data
public class AvroTestObject {

private int baseValue;

}
2 changes: 1 addition & 1 deletion tests/docker-images/latest-version-image/conf/bookie.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
environment=PULSAR_MEM="-Xmx128M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
environment=PULSAR_MEM="-Xmx128M -XX:MaxDirectMemorySize=512M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
command=/pulsar/bin/pulsar bookie
Loading

0 comments on commit 52ae182

Please sign in to comment.