Skip to content

Commit

Permalink
[pulsar-clients]Save name, type, properties for KeyValueSchema (apach…
Browse files Browse the repository at this point in the history
…e#4108)


### Motivation

The current keySchemaInfo and valueSchenamaInfo of KeyValueSchemahas no saved name, type and properties.

### Modifications

Save name, type, properties for keySchemaInfo and valueSchemaInfo of KeyValueSchema

### Verifying this change

Test pass
  • Loading branch information
tuteng authored and sijie committed Apr 25, 2019
1 parent 49b3772 commit 824a654
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.nio.ByteBuffer;
import java.util.Map;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import lombok.Getter;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;


/**
* [Key, Value] pair schema definition
*/
Expand Down Expand Up @@ -82,8 +86,19 @@ private KeyValueSchema(Schema<K> keySchema,

ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
.putInt(valueSchemaInfo.length).put(valueSchemaInfo);
this.schemaInfo.setSchema(byteBuffer.array());
.putInt(valueSchemaInfo.length).put(valueSchemaInfo);

Map<String, String> properties = Maps.newHashMap();
properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
Gson keySchemaGson = new Gson();
properties.put("key.schema.properties", keySchemaGson.toJson(keySchema.getSchemaInfo().getProperties()));
properties.put("value.schema.name", valueSchema.getSchemaInfo().getName());
properties.put("value.schema.type", String.valueOf(valueSchema.getSchemaInfo().getType()));
Gson valueSchemaGson = new Gson();
properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));

this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
}

// encode as bytes: [key.length][key.bytes][value.length][value.bytes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand All @@ -31,6 +32,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Map;

@Slf4j
public class KeyValueSchemaTest {

Expand Down Expand Up @@ -59,6 +62,32 @@ public void testAllowNullAvroSchemaCreate() {
assertEquals(schemaInfo1, schemaInfo2);
}

@Test
public void testFillParametersToSchemainfo() {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());

fooSchema.getSchemaInfo().setName("foo");
fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
Map<String, String> keyProperties = Maps.newTreeMap();
keyProperties.put("foo.key1", "value");
keyProperties.put("foo.key2", "value");
fooSchema.getSchemaInfo().setProperties(keyProperties);
barSchema.getSchemaInfo().setName("bar");
barSchema.getSchemaInfo().setType(SchemaType.AVRO);
Map<String, String> valueProperties = Maps.newTreeMap();
valueProperties.put("bar.key", "key");
barSchema.getSchemaInfo().setProperties(valueProperties);
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);

assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
}

@Test
public void testNotAllowNullAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Expand Down

0 comments on commit 824a654

Please sign in to comment.