Skip to content

Commit

Permalink
Pulsar Client: restore SchemaInfo.builder() API (apache#12673)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Nov 9, 2021
1 parent 9499562 commit 849e4dc
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -107,7 +105,7 @@ public void testDisableSchemaValidationEnforcedHasSchema() throws Exception {
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
}
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
Expand Down Expand Up @@ -156,7 +154,7 @@ public void testEnableSchemaValidationEnforcedHasSchemaMismatch() throws Excepti
}
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "value1");
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
Expand Down Expand Up @@ -186,7 +184,7 @@ public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws Exception
}
admin.namespaces().setSchemaValidationEnforced(namespace,true);
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.STRING)
.properties(properties)
.name("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -119,7 +118,7 @@ public static <T> OldJSONSchema<T> of(Class<T> pojo, Map<String, String> propert
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
JsonSchema schema = schemaGen.generateSchema(pojo);

SchemaInfo info = SchemaInfoImpl.builder()
SchemaInfo info = SchemaInfo.builder()
.name("")
.properties(properties)
.type(SchemaType.JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
Expand Down Expand Up @@ -324,7 +321,7 @@ public void testSchemaComparison() throws Exception {
SchemaCompatibilityStrategy.FULL);
byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
.getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
Expand Down Expand Up @@ -449,7 +448,7 @@ static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
schema = response.getData().getBytes(UTF_8);
}

return SchemaInfoImpl.builder()
return SchemaInfo.builder()
.schema(schema)
.type(response.getType())
.properties(response.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {
byteBuffer.get(array);
return array;
}

SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.common.schema;

import java.util.Collections;
import java.util.Map;

import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down Expand Up @@ -48,4 +50,50 @@ public interface SchemaInfo {
Map<String, String> getProperties();

String getSchemaDefinition();

static SchemaInfoBuilder builder() {
return new SchemaInfoBuilder();
}

class SchemaInfoBuilder {
private String name;
private byte[] schema;
private SchemaType type;
private Map<String, String> properties;
private boolean propertiesSet;

SchemaInfoBuilder() {
}

public SchemaInfoBuilder name(String name) {
this.name = name;
return this;
}

public SchemaInfoBuilder schema(byte[] schema) {
this.schema = schema;
return this;
}

public SchemaInfoBuilder type(SchemaType type) {
this.type = type;
return this;
}

public SchemaInfoBuilder properties(Map<String, String> properties) {
this.properties = properties;
this.propertiesSet = true;
return this;
}

public SchemaInfo build() {
Map<String, String> propertiesValue = this.properties;
if (!this.propertiesSet) {
propertiesValue = Collections.emptyMap();
}
return DefaultImplementation
.getDefaultImplementation()
.newSchemaInfoImpl(name, schema, type, propertiesValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,7 @@
import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
import org.apache.pulsar.client.impl.schema.ByteSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.DateSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.InstantSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.LocalDateSchema;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.apache.pulsar.client.impl.schema.LocalTimeSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.NativeAvroBytesSchema;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl;
import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.TimeSchema;
import org.apache.pulsar.client.impl.schema.TimestampSchema;
import org.apache.pulsar.client.impl.schema.*;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
Expand Down Expand Up @@ -383,4 +355,8 @@ public BatcherBuilder newKeyBasedBatcherBuilder() {
public MessagePayloadFactory newDefaultMessagePayloadFactory() {
return new MessagePayloadFactoryImpl();
}

public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue) {
return new SchemaInfoImpl(name, schema, type, propertiesValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static class SchemaInfoBuilderTest {

@Test
public void testUnsetProperties() {
final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
final SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.STRING)
.schema(new byte[0])
.name("string")
Expand All @@ -305,7 +305,7 @@ public void testUnsetProperties() {
public void testSetProperties() {
final Map<String, String> map = Maps.newHashMap();
map.put("test", "value");
final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
final SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.STRING)
.schema(new byte[0])
.name("string")
Expand All @@ -323,7 +323,7 @@ public void testNullPropertyValue() {
final Map<String, String> map = new HashMap<>();
map.put("key", null);

SchemaInfo si = SchemaInfoImpl.builder()
SchemaInfo si = SchemaInfo.builder()
.name("INT32")
.schema(new byte[0])
.type(SchemaType.INT32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import lombok.Builder;
import lombok.Data;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand All @@ -46,7 +45,7 @@ public class SchemaData {
* @return the converted schema info.
*/
public SchemaInfo toSchemaInfo() {
return SchemaInfoImpl.builder()
return SchemaInfo.builder()
.name("")
.type(type)
.schema(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.Converter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -45,7 +44,7 @@ public KafkaSchemaWrappedSchema(org.apache.pulsar.kafka.shade.avro.Schema schema
Map<String, String> props = new HashMap<>();
boolean isJsonConverter = converter instanceof JsonConverter;
props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
this.schemaInfo = SchemaInfoImpl.builder()
this.schemaInfo = SchemaInfo.builder()
.name(isJsonConverter? "KafKaJson" : "KafkaAvro")
.type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
.schema(schema.toString().getBytes(UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -66,7 +65,7 @@ private Schema<ByteBuffer> fetchSchema(int schemaId) {
org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
String definition = schema.toString(false);
log.info("Schema {} definition {}", schemaId, definition);
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.AVRO)
.name(schema.getName())
.properties(Collections.emptyMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -102,7 +101,7 @@ public PulsarSplit(
this.offloadPolicies = offloadPolicies;

ObjectMapper objectMapper = new ObjectMapper();
this.schemaInfo = SchemaInfoImpl.builder()
this.schemaInfo = SchemaInfo.builder()
.name(originSchemaName)
.type(schemaType)
.schema(schema.getBytes("ISO8859-1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdm
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
SchemaInfo badSchemaInfo = SchemaInfo.builder()
.schema(new byte[0])
.type(SchemaType.AVRO)
.build();
Expand All @@ -216,7 +216,7 @@ public void testGetTableMetadataTableBlankSchema(String delimiter) throws Pulsar
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
SchemaInfo badSchemaInfo = SchemaInfo.builder()
.schema("foo".getBytes())
.type(SchemaType.AVRO)
.build();
Expand Down
Loading

0 comments on commit 849e4dc

Please sign in to comment.