Skip to content

Commit

Permalink
revise the schema default type not null (apache#3752)
Browse files Browse the repository at this point in the history
### Motivation
Fix apache#3741 

### Modifications
Support define not not allow null field in schema

### Verifying this change
Add not allow null field schema verify

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (yes)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
  • Loading branch information
congbobo184 authored and sijie committed Mar 19, 2019
1 parent 6e0c11e commit 1a1c557
Show file tree
Hide file tree
Showing 26 changed files with 868 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -43,6 +44,8 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
private static final String topicString = "my-property/my-ns/topic-string";
private static final String topicJson = "my-property/my-ns/topic-json";
private static final String topicAvro = "my-property/my-ns/topic-avro";
private static final String topicJsonNotNull = "my-property/my-ns/topic-json-not-null";
private static final String topicAvroNotNull = "my-property/my-ns/topic-avro-not-null";

List<Producer<?>> producers = new ArrayList<>();

Expand All @@ -62,6 +65,11 @@ protected void setup() throws Exception {
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicAvro).create());
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicJson).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());

}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -51,11 +52,11 @@ public SchemaCompatibilityCheck getSchemaCheck() {
public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingException {

SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));

from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
from = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testJsonProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(JsonEncodedPojo.class);
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());

Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(jsonSchema)
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception {
log.info("-- Starting {} test --", methodName);

JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(JsonEncodedPojo.class);
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());

pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
Expand Down Expand Up @@ -166,7 +167,7 @@ public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
).get();

Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(JSONSchema.of(JsonEncodedPojo.class))
.newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
Expand Down Expand Up @@ -194,7 +195,7 @@ public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
).get();

Producer<JsonEncodedPojo> producer = pulsarClient
.newProducer(JSONSchema.of(JsonEncodedPojo.class))
.newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.create();

Expand Down Expand Up @@ -273,7 +274,9 @@ public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception {
).get();

Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
.newConsumer(AvroSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class))
.newConsumer(AvroSchema.of
(SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
Expand All @@ -286,7 +289,8 @@ public void testAvroProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());

Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(avroSchema)
Expand Down Expand Up @@ -355,7 +359,8 @@ public void testAvroConsumerWithWrongPrestoredSchema() throws Exception {
).get();

Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(AvroSchema.of(AvroEncodedPojo.class))
.newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
Expand Down Expand Up @@ -454,7 +459,8 @@ public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());

Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
Expand Down Expand Up @@ -502,7 +508,8 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception {
log.info("-- Starting {} test --", methodName);

AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());

Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
Expand Down Expand Up @@ -548,7 +555,8 @@ public void testAutoBytesProducer() throws Exception {
log.info("-- Starting {} test --", methodName);

AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());

try (Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.pulsar.client.api;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -169,58 +169,43 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Cla
}

/**
* Create a Avro schema type by extracting the fields of the specified class.
*
* @param clazz the POJO class to be used to extract the Avro schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(Class<T> clazz) {
return DefaultImplementation.newAvroSchema(clazz);
}

/**
* Create a Avro schema type using the provided avro schema definition.
* Create a Avro schema type by default configuration of the class
*
* @param schemaDefinition avro schema definition
* @param pojo the POJO class to be used to extract the Avro schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition) {
return AVRO(schemaDefinition, Collections.emptyMap());
static <T> Schema<T> AVRO(Class<T> pojo) {
return DefaultImplementation.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
}

/**
* Create a Avro schema type using the provided avro schema definition.
* Create a Avro schema type with schema definition
*
* @param schemaDefinition avro schema definition
* @param properties pulsar schema properties
* @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.newAvroSchema(schemaDefinition);
}

/**
* Create a JSON schema type by extracting the fields of the specified class.
*
* @param clazz the POJO class to be used to extract the JSON schema
* @param pojo the POJO class to be used to extract the JSON schema
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> clazz) {
return DefaultImplementation.newJSONSchema(clazz);
static <T> Schema<T> JSON(Class<T> pojo) {
return DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
}

/**
* Create a JSON schema type by extracting the fields of the specified class.
* Create a JSON schema type with schema definition
*
* @param clazz the POJO class to be used to extract the JSON schema
* @param schemaDefinition schema definition json string (using avro schema syntax)
* @param properties pulsar schema properties
* @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> clazz,
String schemaDefinition,
Map<String, String> properties) {
return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
static <T> Schema<T> JSON(SchemaDefinition schemaDefinition) {
return DefaultImplementation.newJSONSchema(schemaDefinition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.client.internal.DefaultImplementation;

import java.util.Map;


public interface SchemaDefinition<T> {

/**
* Get a new builder instance that can used to configure and build a {@link SchemaDefinition} instance.
*
* @return the {@link SchemaDefinition}
*/
static <T> SchemaDefinitionBuilder<T> builder() {
return DefaultImplementation.newSchemaDefinitionBuilder();
}

/**
* get schema whether always allow null or not
*
* @return schema always null or not
*/
public boolean getAlwaysAllowNull();

/**
* Get schema class
*
* @return schema class
*/
public Map<String, String> getProperties();

/**
* Get json schema definition
*
* @return schema class
*/
public String getJsonDef();

/**
* Get pojo schema definition
*
* @return pojo schema
*/
public Class<T> getPojo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;



import java.util.Map;

/**
* Builder to build schema definition {@link SchemaDefinition}.
*/
public interface SchemaDefinitionBuilder<T> {

/**
* Set schema whether always allow null or not
*
* @param alwaysAllowNull definition null or not
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);

/**
* Set schema info properties
*
* @param properties schema info properties
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);

/**
* Set schema info properties
*
* @param key property key
* @param value property value
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);

/**
* Set schema of pojo definition
*
* @param pojo pojo schema definition
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);

/**
* Set schema of json definition
*
* @param jsonDefinition json schema definition
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);

/**
* Build the schema definition.
*
* @return the schema definition.
*/
SchemaDefinition<T> build();

}
Loading

0 comments on commit 1a1c557

Please sign in to comment.