Skip to content

Commit

Permalink
[schema] Introduce schema builder to build schema. (apache#3682)
Browse files Browse the repository at this point in the history
*Motivation*

Currently we are supporting POJO based schema in java clients.
POJO schema is only useful when the POJO is predefined. However
in applications like a CDC pipeline, POJO is no predefined, there
is no other way to define a schema.

Since we are using avro schema for schema management, this PR
is proposing a simple schema builder wrapper on avro schema builder.

*Modifications*

Introduce schema builder to build a record schema.

*NOTES*

Currently we only support primitives in defining fields in a record schema in this PR.
We will add nested types in future PRs.
  • Loading branch information
sijie authored Feb 26, 2019
1 parent 3c36705 commit d46474b
Show file tree
Hide file tree
Showing 9 changed files with 762 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,5 @@ static Schema<byte[]> AUTO_PRODUCE_BYTES() {
static Schema<?> getSchema(SchemaInfo schemaInfo) {
return DefaultImplementation.getSchema(schemaInfo);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.common.schema.SchemaType;

/**
* Build a field for a record.
*/
public interface FieldSchemaBuilder<S extends FieldSchemaBuilder<S>> {

/**
* Set name-value pair properties for this field.
*
* @param name name of the property
* @param val value of the property
* @return field schema builder
*/
S property(String name, String val);

/**
* The documentation of this field.
*
* @param doc documentation
* @return field schema builder
*/
S doc(String doc);

/**
* The optional name aliases of this field.
*
* @param aliases the name aliases of this field
* @return field schema builder
*/
S aliases(String... aliases);

/**
* The type of this field.
*
* <p>Currently only primitive types are supported.
*
* @param type schema type of this field
* @return field schema builder
*/
S type(SchemaType type);

/**
* Make this field optional.
*
* @return field schema builder
*/
S optional();

/**
* Make this field required.
*
* @return field schema builder
*/
S required();

/**
* Set the default value of this field.
*
* <p>The value is validated against the schema type.
*
* @return value
*/
S defaultValue(Object value);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

/**
* Building the schema for a {@link GenericRecord}.
*/
public interface RecordSchemaBuilder {

/**
* Attach val-name property pair to the record schema.
*
* @param name property name
* @param val property value
* @return record schema builder
*/
RecordSchemaBuilder property(String name, String val);

/**
* Add a field with the given name to the record.
*
* @param fieldName name of the field
* @return field schema builder to build the field.
*/
FieldSchemaBuilder field(String fieldName);

/**
* Add doc to the record schema.
*
* @param doc documentation
* @return field schema builder
*/
RecordSchemaBuilder doc(String doc);

/**
* Build the schema info.
*
* @return the schema info.
*/
SchemaInfo build(SchemaType schemaType);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* Builder to build schema.
*/
public interface SchemaBuilder {

/**
* Build the schema for a record.
*
* @param name name of the record.
* @return builder to build the schema for a record.
*/
static RecordSchemaBuilder record(String name) {
return DefaultImplementation.newRecordSchemaBuilder(name);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -171,4 +172,10 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
() -> (Schema<?>) getStaticMethod("org.apache.pulsar.client.impl.schema.AutoConsumeSchema",
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}

public static RecordSchemaBuilder newRecordSchemaBuilder(String name) {
return catchExceptions(
() -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl",
String.class).newInstance(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import static java.util.Objects.requireNonNull;

import java.util.HashMap;
import java.util.Map;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
import org.apache.pulsar.common.schema.SchemaType;

/**
* The default implementation of {@link FieldSchemaBuilder}.
*/
class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImpl> {

private final String fieldName;

private SchemaType type;
private boolean optional = false;
private Object defaultVal = null;
private final Map<String, String> properties = new HashMap<>();
private String doc;
private String[] aliases;

FieldSchemaBuilderImpl(String fieldName) {
this.fieldName = fieldName;
}

@Override
public FieldSchemaBuilderImpl property(String name, String val) {
properties.put(name, val);
return this;
}

@Override
public FieldSchemaBuilderImpl doc(String doc) {
this.doc = doc;
return this;
}

@Override
public FieldSchemaBuilderImpl aliases(String... aliases) {
this.aliases = aliases;
return this;
}

@Override
public FieldSchemaBuilderImpl type(SchemaType type) {
this.type = type;
return this;
}

@Override
public FieldSchemaBuilderImpl optional() {
optional = true;
return this;
}

@Override
public FieldSchemaBuilderImpl required() {
optional = false;
return this;
}

@Override
public FieldSchemaBuilderImpl defaultValue(Object value) {
defaultVal = value;
return this;
}

Field build() {
requireNonNull(type, "Schema type is not provided");
// verify the default value and object
SchemaUtils.validateFieldSchema(
fieldName,
type,
defaultVal
);

final Schema baseSchema;
switch (type) {
case INT32:
baseSchema = SchemaBuilder.builder().intType();
break;
case INT64:
baseSchema = SchemaBuilder.builder().longType();
break;
case STRING:
baseSchema = SchemaBuilder.builder().stringType();
break;
case FLOAT:
baseSchema = SchemaBuilder.builder().floatType();
break;
case DOUBLE:
baseSchema = SchemaBuilder.builder().doubleType();
break;
case BOOLEAN:
baseSchema = SchemaBuilder.builder().booleanType();
break;
case BYTES:
baseSchema = SchemaBuilder.builder().bytesType();
break;
default:
throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
}

for (Map.Entry<String, String> entry : properties.entrySet()) {
baseSchema.addProp(entry.getKey(), entry.getValue());
}

if (null != aliases) {
for (String alias : aliases) {
baseSchema.addAlias(alias);
}
}

final Schema finalSchema;
if (optional) {
if (defaultVal != null) {
finalSchema = SchemaBuilder.builder().unionOf()
.type(baseSchema)
.and()
.nullType()
.endUnion();
} else {
finalSchema = SchemaBuilder.builder().unionOf()
.nullType()
.and()
.type(baseSchema)
.endUnion();
}
} else {
finalSchema = baseSchema;
}

final Object finalDefaultValue;
if (defaultVal != null) {
finalDefaultValue = SchemaUtils.toAvroObject(defaultVal);
} else {
if (optional) {
finalDefaultValue = JsonProperties.NULL_VALUE;
} else {
finalDefaultValue = null;
}
}

return new Field(
fieldName,
finalSchema,
doc,
finalDefaultValue
);
}

}
Loading

0 comments on commit d46474b

Please sign in to comment.