Skip to content

Commit

Permalink
[pulsar-clients]Store key part of a KeyValue schema into pulsar messa…
Browse files Browse the repository at this point in the history
…ge keys (apache#4117)

### Motivation

The current implementation of KeyValue schema stores key and value together as part of message payload. Ideally the key should be stored as part of message key.

It can be done by introducing a property in KeyValue schema to indicate whether store key in payload or as message key.

### Modifications

* Add keyIsStoredToMessage for encode and decode of KeyValueSchema

### Verifying this change
Unit test pass
  • Loading branch information
tuteng authored and sijie committed Apr 30, 2019
1 parent 3fd9168 commit 7f21501
Show file tree
Hide file tree
Showing 9 changed files with 540 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -268,6 +269,13 @@ static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value) {
return DefaultImplementation.newKeyValueSchema(key, value);
}

/**
* Key Value Schema using passed in key, value and encoding type schemas.
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value, KeyValueEncodingType keyValueEncodingType) {
return DefaultImplementation.newKeyValueSchema(key, value, keyValueEncodingType);
}

@Deprecated
static Schema<GenericRecord> AUTO() {
return AUTO_CONSUME();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.schema.*;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -250,6 +251,14 @@ public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchem
"of", Schema.class, Schema.class).invoke(null, keySchema, valueSchema));
}

public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
return catchExceptions(
() -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
"of", Schema.class, Schema.class, KeyValueEncodingType.class)
.invoke(null, keySchema, valueSchema, keyValueEncodingType));
}

public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Class<K> key, Class<V> value, SchemaType type) {
return catchExceptions(
() -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.schema;

/**
* Encoding types of supported KeyValueSchema for Pulsar messages
*/
public enum KeyValueEncodingType {
// key is stored as message key, while value is stored as message payload
SEPARATED,
// key and value are stored as message payload
INLINE
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;

public class MessageImpl<T> implements Message<T> {

Expand Down Expand Up @@ -244,6 +247,13 @@ public T getValue() {
byte [] schemaVersion = getSchemaVersion();
if (schema.supportSchemaVersioning() && schemaVersion != null) {
return schema.decode(getData(), schemaVersion);
} else if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return schema.decode(getKeyBytes(), getData());
} else {
return schema.decode(getData());
}
} else {
return schema.decode(getData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;

public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
Expand Down Expand Up @@ -64,13 +67,23 @@ public CompletableFuture<MessageId> sendAsync() {

@Override
public TypedMessageBuilder<T> key(String key) {
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
}
msgMetadataBuilder.setPartitionKey(key);
msgMetadataBuilder.setPartitionKeyB64Encoded(false);
return this;
}

@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
}
msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
return this;
Expand All @@ -84,7 +97,21 @@ public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) {

@Override
public TypedMessageBuilder<T> value(T value) {

checkArgument(value != null, "Need Non-Null content value");
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
// set value as the payload
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
return this;
}
}
this.content = ByteBuffer.wrap(schema.encode(value));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.schema;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.nio.ByteBuffer;
import java.util.Map;
Expand All @@ -28,7 +29,9 @@
import lombok.Getter;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand All @@ -37,6 +40,7 @@
* [Key, Value] pair schema definition
*/
public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {

@Getter
private final Schema<K> keySchema;
@Getter
Expand All @@ -46,21 +50,31 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
// [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
private final SchemaInfo schemaInfo;

@Getter
private final KeyValueEncodingType keyValueEncodingType;

/**
* Key Value Schema using passed in schema type, support JSON and AVRO currently.
*/
public static <K, V> Schema<KeyValue<K, V>> of(Class<K> key, Class<V> value, SchemaType type) {
checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
if (SchemaType.JSON == type) {
return new KeyValueSchema<>(JSONSchema.of(key), JSONSchema.of(value));
return new KeyValueSchema<>(JSONSchema.of(key), JSONSchema.of(value), KeyValueEncodingType.INLINE);
} else {
// AVRO
return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value));
return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value), KeyValueEncodingType.INLINE);
}
}


public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema, Schema<V> valueSchema) {
return new KeyValueSchema<>(keySchema, valueSchema);
return new KeyValueSchema<>(keySchema, valueSchema, KeyValueEncodingType.INLINE);
}

public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema,
Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
return new KeyValueSchema<>(keySchema, valueSchema, keyValueEncodingType);
}

private static final Schema<KeyValue<byte[], byte[]>> KV_BYTES = new KeyValueSchema<>(
Expand All @@ -71,15 +85,22 @@ public static Schema<KeyValue<byte[], byte[]>> kvBytes() {
return KV_BYTES;
}


private KeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema) {
this(keySchema, valueSchema, KeyValueEncodingType.INLINE);
}

private KeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
this.keySchema = keySchema;
this.valueSchema = valueSchema;

// set schemaInfo
this.schemaInfo = new SchemaInfo()
.setName("KeyValue")
.setType(SchemaType.KEY_VALUE);
.setName("KeyValue")
.setType(SchemaType.KEY_VALUE);

byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
Expand All @@ -89,6 +110,7 @@ private KeyValueSchema(Schema<K> keySchema,
.putInt(valueSchemaInfo.length).put(valueSchemaInfo);

Map<String, String> properties = Maps.newHashMap();

properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
Gson keySchemaGson = new Gson();
Expand All @@ -98,20 +120,31 @@ private KeyValueSchema(Schema<K> keySchema,
Gson valueSchemaGson = new Gson();
properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));

checkNotNull(keyValueEncodingType, "Null encoding type is provided");
this.keyValueEncodingType = keyValueEncodingType;
properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));

this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
}

// encode as bytes: [key.length][key.bytes][value.length][value.bytes]
// encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
public byte[] encode(KeyValue<K, V> message) {
byte[] keyBytes = keySchema.encode(message.getKey());
byte[] valueBytes = valueSchema.encode(message.getValue());
if (keyValueEncodingType != null && keyValueEncodingType == KeyValueEncodingType.INLINE) {
byte [] keyBytes = keySchema.encode(message.getKey());
byte [] valueBytes = valueSchema.encode(message.getValue());
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
return byteBuffer.array();
} else {
return valueSchema.encode(message.getValue());
}

ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
return byteBuffer.array();
}

public KeyValue<K, V> decode(byte[] bytes) {
if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
}
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int keyLength = byteBuffer.getInt();
byte[] keyBytes = new byte[keyLength];
Expand All @@ -121,6 +154,10 @@ public KeyValue<K, V> decode(byte[] bytes) {
byte[] valueBytes = new byte[valueLength];
byteBuffer.get(valueBytes);

return decode(keyBytes, valueBytes);
}

public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes) {
return new KeyValue<>(keySchema.decode(keyBytes), valueSchema.decode(valueBytes));
}

Expand Down
Loading

0 comments on commit 7f21501

Please sign in to comment.