Skip to content

Commit

Permalink
Fix brokean protobuf-native schema (apache#8621)
Browse files Browse the repository at this point in the history
Motivation

The protobuf-native schema was broken due to two concurrent merges
  • Loading branch information
sijie authored Nov 19, 2020
1 parent 0e1d0d8 commit ab000ec
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ public Descriptors.Descriptor getProtobufNativeSchema() {
return ProtobufNativeSchemaUtils.deserialize(this.schemaInfo.getSchema());
}

@Override
protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
throw new RuntimeException("ProtobufNativeSchema don't support schema versioning");
}

public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,16 @@
*/
package org.apache.pulsar.client.impl.schema.generic;

import static org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericProtobufNativeReader.parseProtobufSchema;

import com.google.protobuf.Descriptors;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.List;
import java.util.stream.Collectors;

/**
* Generic ProtobufNative schema.
*/
Expand All @@ -53,46 +48,17 @@ public GenericProtobufNativeSchema(SchemaInfo schemaInfo,
.stream()
.map(f -> new Field(f.getName(), f.getIndex()))
.collect(Collectors.toList());
setReader(new GenericProtobufNativeReader(descriptor));
setReader(new MultiVersionGenericProtobufNativeReader(useProvidedSchemaAsReaderSchema, schemaInfo));
setWriter(new GenericProtobufNativeWriter());
}

@Override
public List<Field> getFields() {
return fields;
}

@Override
public GenericRecordBuilder newRecordBuilder() {
return new ProtobufNativeRecordBuilderImpl(this);
}

@Override
protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo);
Descriptors.Descriptor recordDescriptor = parseProtobufSchema(schemaInfo);
Descriptors.Descriptor readerSchemaDescriptor = useProvidedSchemaAsReaderSchema ? descriptor : recordDescriptor;
return new GenericProtobufNativeReader(
readerSchemaDescriptor,
schemaVersion.get());
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo);
return reader;
}
}

protected static Descriptors.Descriptor parseProtobufSchema(SchemaInfo schemaInfo) {
return ProtobufNativeSchemaUtils.deserialize(schemaInfo.getSchema());
}

public static GenericSchema of(SchemaInfo schemaInfo) {
return new GenericProtobufNativeSchema(schemaInfo, true);
return new GenericProtobufNativeSchema(schemaInfo);
}

public static GenericSchema of(SchemaInfo schemaInfo, boolean useProvidedSchemaAsReaderSchema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;

import com.google.protobuf.Descriptors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* A multi version generic protobuf-native reader.
*/
@Slf4j
public class MultiVersionGenericProtobufNativeReader
extends AbstractMultiVersionReader<GenericRecord>
implements SchemaReader<GenericRecord> {

// the flag controls whether to use the provided schema as reader schema
// to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
// allows decoding the messages using the schema associated with the messages.
private final boolean useProvidedSchemaAsReaderSchema;
private final SchemaInfo schemaInfo;
private final Descriptors.Descriptor descriptor;

public MultiVersionGenericProtobufNativeReader(boolean useProvidedSchemaAsReaderSchema,
SchemaInfo schemaInfo) {
super(new GenericProtobufNativeReader(parseProtobufSchema(schemaInfo)));
this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
this.schemaInfo = schemaInfo;
this.descriptor = parseProtobufSchema(schemaInfo);
}

@Override
protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo);
Descriptors.Descriptor recordDescriptor = parseProtobufSchema(schemaInfo);
Descriptors.Descriptor readerSchemaDescriptor =
useProvidedSchemaAsReaderSchema ? descriptor : recordDescriptor;
return new GenericProtobufNativeReader(
readerSchemaDescriptor,
schemaVersion.get());
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo);
return providerSchemaReader;
}
}

protected static Descriptors.Descriptor parseProtobufSchema(SchemaInfo schemaInfo) {
return ProtobufNativeSchemaUtils.deserialize(schemaInfo.getSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
}
});

AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) {
public AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) {
this.providerSchemaReader = providerSchemaReader;
}

Expand Down

0 comments on commit ab000ec

Please sign in to comment.