From 3cf6be139e191f3fa058f01c469e1dbd4c8534be Mon Sep 17 00:00:00 2001 From: ran Date: Mon, 17 Feb 2020 18:02:05 +0800 Subject: [PATCH] KeyValue schema support for pulsar sql (#6325) Fixes #5560 ### Motivation Currently, Pulsar SQL can't read the keyValue schema data. This PR added support Pulsar SQL reading messages with a key-value schema. ### Modifications Add KeyValue schema support for Pulsar SQL. Add prefix __key. for the key field name. --- .../pulsar/common/api/raw/RawMessage.java | 15 + .../pulsar/common/api/raw/RawMessageImpl.java | 24 +- .../pulsar/sql/presto/AvroSchemaHandler.java | 5 + .../pulsar/sql/presto/JSONSchemaHandler.java | 5 + .../sql/presto/KeyValueSchemaHandler.java | 91 +++++ .../pulsar/sql/presto/PulsarColumnHandle.java | 51 ++- .../sql/presto/PulsarColumnMetadata.java | 33 +- .../sql/presto/PulsarInternalColumn.java | 5 +- .../pulsar/sql/presto/PulsarMetadata.java | 86 +++-- .../presto/PulsarPrimitiveSchemaHandler.java | 5 + .../pulsar/sql/presto/PulsarRecordCursor.java | 11 +- .../sql/presto/PulsarSchemaHandlers.java | 4 +- .../apache/pulsar/sql/presto/PulsarSplit.java | 40 ++- .../pulsar/sql/presto/PulsarSplitManager.java | 16 +- .../pulsar/sql/presto/SchemaHandler.java | 2 + .../sql/presto/TestPulsarConnector.java | 51 +-- .../TestPulsarKeyValueSchemaHandler.java | 331 ++++++++++++++++++ .../TestPulsarPrimitiveSchemaHandler.java | 2 +- 18 files changed, 705 insertions(+), 72 deletions(-) create mode 100644 pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java create mode 100644 pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java index ce5d9404ccb28..07a8098a838df 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java @@ -101,4 +101,19 @@ public interface RawMessage { * @return the key of the message */ Optional getKey(); + + /** + * Get byteBuf of the key. + * + * @return the byte array with the key payload + */ + Optional getKeyBytes(); + + /** + * Check whether the key has been base64 encoded. + * + * @return true if the key is base64 encoded, false otherwise + */ + boolean hasBase64EncodedKey(); + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index 282945f08ad9d..335bf967bf099 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -19,9 +19,10 @@ package org.apache.pulsar.common.api.raw; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; - +import java.util.Base64; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -140,4 +141,25 @@ public Optional getKey() { return Optional.empty(); } } + + @Override + public Optional getKeyBytes() { + if (getKey().isPresent()) { + if (hasBase64EncodedKey()) { + return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get()))); + } else { + return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes())); + } + } + return Optional.empty(); + } + + @Override + public boolean hasBase64EncodedKey() { + if (singleMessageMetadata != null) { + return singleMessageMetadata.getPartitionKeyB64Encoded(); + } + return msgMetadata.get().getPartitionKeyB64Encoded(); + } + } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java index f6807a2b19a14..3e3685ebd6175 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java @@ -77,6 +77,11 @@ public Object deserialize(ByteBuf payload) { return null; } + @Override + public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) { + return null; + } + @Override public Object extractField(int index, Object currentRecord) { try { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java index 8649e41fc3c7a..99863a36dae5c 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java @@ -73,6 +73,11 @@ public Object deserialize(ByteBuf payload) { } } + @Override + public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) { + return null; + } + @Override public Object extractField(int index, Object currentRecord) { try { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java new file mode 100644 index 0000000000000..2f056d3abb8a3 --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import io.airlift.log.Logger; +import io.netty.buffer.ByteBuf; +import java.util.List; +import java.util.Objects; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfo; + + +/** + * Schema handler for payload in the KeyValue format. + */ +public class KeyValueSchemaHandler implements SchemaHandler { + + private static final Logger log = Logger.get(KeyValueSchemaHandler.class); + + private final List columnHandles; + + private final SchemaHandler keySchemaHandler; + + private final SchemaHandler valueSchemaHandler; + + private KeyValueEncodingType keyValueEncodingType; + + public KeyValueSchemaHandler(SchemaInfo schemaInfo, List columnHandles) { + this.columnHandles = columnHandles; + KeyValue kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); + keySchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(kvSchemaInfo.getKey(), columnHandles); + valueSchemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(kvSchemaInfo.getValue(), columnHandles); + keyValueEncodingType = KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo); + } + + @Override + public Object deserialize(ByteBuf payload) { + return null; + } + + @Override + public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) { + ByteBuf keyByteBuf; + ByteBuf valueByteBuf; + if (Objects.equals(keyValueEncodingType, KeyValueEncodingType.INLINE)) { + dataPayload.resetReaderIndex(); + int keyLength = dataPayload.readInt(); + keyByteBuf = dataPayload.readSlice(keyLength); + + int valueLength = dataPayload.readInt(); + valueByteBuf = dataPayload.readSlice(valueLength); + } else { + keyByteBuf = keyPayload; + valueByteBuf = dataPayload; + } + Object keyObj = keySchemaHandler.deserialize(keyByteBuf); + Object valueObj = valueSchemaHandler.deserialize(valueByteBuf); + return new KeyValue<>(keyObj, valueObj); + } + + @Override + public Object extractField(int index, Object currentRecord) { + PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index); + KeyValue keyValue = (KeyValue) currentRecord; + if (pulsarColumnHandle.isKey()) { + return keySchemaHandler.extractField(index, keyValue.getKey()); + } else if (pulsarColumnHandle.isValue()) { + return valueSchemaHandler.extractField(index, keyValue.getValue()); + } + return null; + } + +} diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java index 2a1bd43e07e97..2a07f52e548bd 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java @@ -24,8 +24,10 @@ import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.type.Type; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Arrays; +import java.util.Objects; /** * This class represents the basic information about a presto column. @@ -58,6 +60,26 @@ public class PulsarColumnHandle implements ColumnHandle { private final Integer[] positionIndices; + private HandleKeyValueType handleKeyValueType; + + /** + * Column Handle keyValue type, used for keyValue schema. + */ + public enum HandleKeyValueType { + /** + * The handle not for keyValue schema. + */ + NONE, + /** + * The key schema handle for keyValue schema. + */ + KEY, + /** + * The value schema handle for keyValue schema. + */ + VALUE + } + @JsonCreator public PulsarColumnHandle( @JsonProperty("connectorId") String connectorId, @@ -66,7 +88,8 @@ public PulsarColumnHandle( @JsonProperty("hidden") boolean hidden, @JsonProperty("internal") boolean internal, @JsonProperty("fieldNames") String[] fieldNames, - @JsonProperty("positionIndices") Integer[] positionIndices) { + @JsonProperty("positionIndices") Integer[] positionIndices, + @JsonProperty("handleKeyValueType") HandleKeyValueType handleKeyValueType) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.name = requireNonNull(name, "name is null"); this.type = requireNonNull(type, "type is null"); @@ -74,6 +97,11 @@ public PulsarColumnHandle( this.internal = internal; this.fieldNames = fieldNames; this.positionIndices = positionIndices; + if (handleKeyValueType == null) { + this.handleKeyValueType = HandleKeyValueType.NONE; + } else { + this.handleKeyValueType = handleKeyValueType; + } } @JsonProperty @@ -111,6 +139,20 @@ public Integer[] getPositionIndices() { return positionIndices; } + @JsonProperty + public HandleKeyValueType getHandleKeyValueType() { + return handleKeyValueType; + } + + @JsonIgnore + public boolean isKey() { + return Objects.equals(handleKeyValueType, HandleKeyValueType.KEY); + } + + @JsonIgnore + public boolean isValue() { + return Objects.equals(handleKeyValueType, HandleKeyValueType.VALUE); + } ColumnMetadata getColumnMetadata() { return new ColumnMetadata(name, type, null, hidden); @@ -145,7 +187,10 @@ public boolean equals(Object o) { if (!Arrays.deepEquals(fieldNames, that.fieldNames)) { return false; } - return Arrays.deepEquals(positionIndices, that.positionIndices); + if (!Arrays.deepEquals(positionIndices, that.positionIndices)) { + return false; + } + return Objects.equals(handleKeyValueType, that.handleKeyValueType); } @Override @@ -157,6 +202,7 @@ public int hashCode() { result = 31 * result + (internal ? 1 : 0); result = 31 * result + Arrays.hashCode(fieldNames); result = 31 * result + Arrays.hashCode(positionIndices); + result = 31 * result + (handleKeyValueType != null ? handleKeyValueType.hashCode() : 0); return result; } @@ -170,6 +216,7 @@ public String toString() { + ", internal=" + internal + ", fieldNames=" + Arrays.toString(fieldNames) + ", positionIndices=" + Arrays.toString(positionIndices) + + ", handleKeyValueType=" + handleKeyValueType + '}'; } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java index 5b033fac86fec..789ea7f580ac4 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.type.Type; import java.util.Arrays; +import java.util.Objects; /** * Description of the column metadata. @@ -32,15 +33,19 @@ public class PulsarColumnMetadata extends ColumnMetadata { private String nameWithCase; private String[] fieldNames; private Integer[] positionIndices; + private PulsarColumnHandle.HandleKeyValueType handleKeyValueType; + public final static String KEY_SCHEMA_COLUMN_PREFIX = "__key."; public PulsarColumnMetadata(String name, Type type, String comment, String extraInfo, boolean hidden, boolean isInternal, - String[] fieldNames, Integer[] positionIndices) { + String[] fieldNames, Integer[] positionIndices, + PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { super(name, type, comment, extraInfo, hidden); this.nameWithCase = name; this.isInternal = isInternal; this.fieldNames = fieldNames; this.positionIndices = positionIndices; + this.handleKeyValueType = handleKeyValueType; } public String getNameWithCase() { @@ -59,6 +64,25 @@ public Integer[] getPositionIndices() { return positionIndices; } + public PulsarColumnHandle.HandleKeyValueType getHandleKeyValueType() { + return handleKeyValueType; + } + + public boolean isKey() { + return Objects.equals(handleKeyValueType, PulsarColumnHandle.HandleKeyValueType.KEY); + } + + public boolean isValue() { + return Objects.equals(handleKeyValueType, PulsarColumnHandle.HandleKeyValueType.VALUE); + } + + public static String getColumnName(PulsarColumnHandle.HandleKeyValueType handleKeyValueType, String name) { + if (Objects.equals(PulsarColumnHandle.HandleKeyValueType.KEY, handleKeyValueType)) { + return KEY_SCHEMA_COLUMN_PREFIX + name; + } + return name; + } + @Override public String toString() { return "PulsarColumnMetadata{" @@ -66,6 +90,7 @@ public String toString() { + ", nameWithCase='" + nameWithCase + '\'' + ", fieldNames=" + Arrays.toString(fieldNames) + ", positionIndices=" + Arrays.toString(positionIndices) + + ", handleKeyValueType=" + handleKeyValueType + '}'; } @@ -92,7 +117,10 @@ public boolean equals(Object o) { if (!Arrays.deepEquals(fieldNames, that.fieldNames)) { return false; } - return Arrays.deepEquals(positionIndices, that.positionIndices); + if (!Arrays.deepEquals(positionIndices, that.positionIndices)) { + return false; + } + return Objects.equals(handleKeyValueType, that.handleKeyValueType); } @Override @@ -102,6 +130,7 @@ public int hashCode() { result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() : 0); result = 31 * result + Arrays.hashCode(fieldNames); result = 31 * result + Arrays.hashCode(positionIndices); + result = 31 * result + (handleKeyValueType != null ? handleKeyValueType.hashCode() : 0); return result; } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java index 71107f86b3d64..4f2587bf0210f 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java @@ -218,11 +218,12 @@ PulsarColumnHandle getColumnHandle(String connectorId, boolean hidden) { getName(), getType(), hidden, - true, null, null); + true, null, null, PulsarColumnHandle.HandleKeyValueType.NONE); } PulsarColumnMetadata getColumnMetadata(boolean hidden) { - return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null, null); + return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null, null, + PulsarColumnHandle.HandleKeyValueType.NONE); } public static Set getInternalFields() { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java index dc9a479aa3503..420232ba0e439 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java @@ -80,7 +80,9 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -222,7 +224,8 @@ public Map getColumnHandles(ConnectorSession session, Conn pulsarColumnMetadata.isHidden(), pulsarColumnMetadata.isInternal(), pulsarColumnMetadata.getFieldNames(), - pulsarColumnMetadata.getPositionIndices()); + pulsarColumnMetadata.getPositionIndices(), + pulsarColumnMetadata.getHandleKeyValueType()); columnHandles.put( columnMetadata.getName(), @@ -324,7 +327,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, } } List handles = getPulsarColumns( - topicName, schemaInfo, withInternalColumns + topicName, schemaInfo, withInternalColumns, PulsarColumnHandle.HandleKeyValueType.NONE ); @@ -336,12 +339,15 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, */ static List getPulsarColumns(TopicName topicName, SchemaInfo schemaInfo, - boolean withInternalColumns) { + boolean withInternalColumns, + PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { SchemaType schemaType = schemaInfo.getType(); if (schemaType.isStruct()) { - return getPulsarColumnsFromStructSchema(topicName, schemaInfo, withInternalColumns); + return getPulsarColumnsFromStructSchema(topicName, schemaInfo, withInternalColumns, handleKeyValueType); } else if (schemaType.isPrimitive()) { - return getPulsarColumnsFromPrimitiveSchema(topicName, schemaInfo, withInternalColumns); + return getPulsarColumnsFromPrimitiveSchema(topicName, schemaInfo, withInternalColumns, handleKeyValueType); + } else if (schemaType.equals(SchemaType.KEY_VALUE)) { + return getPulsarColumnsFromKeyValueSchema(topicName, schemaInfo, withInternalColumns); } else { throw new IllegalArgumentException("Unsupported schema : " + schemaInfo); } @@ -349,15 +355,16 @@ static List getPulsarColumns(TopicName topicName, static List getPulsarColumnsFromPrimitiveSchema(TopicName topicName, SchemaInfo schemaInfo, - boolean withInternalColumns) { + boolean withInternalColumns, + PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { ImmutableList.Builder builder = ImmutableList.builder(); ColumnMetadata valueColumn = new PulsarColumnMetadata( - "__value__", + PulsarColumnMetadata.getColumnName(handleKeyValueType, "__value__"), convertPulsarType(schemaInfo.getType()), "The value of the message with primitive type schema", null, false, false, new String[0], - new Integer[0]); + new Integer[0], handleKeyValueType); builder.add(valueColumn); @@ -372,8 +379,8 @@ static List getPulsarColumnsFromPrimitiveSchema(TopicName topicN static List getPulsarColumnsFromStructSchema(TopicName topicName, SchemaInfo schemaInfo, - boolean withInternalColumns) { - + boolean withInternalColumns, + PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { String schemaJson = new String(schemaInfo.getSchema()); if (StringUtils.isBlank(schemaJson)) { throw new PrestoException(NOT_SUPPORTED, "Topic " + topicName.toString() @@ -389,7 +396,7 @@ static List getPulsarColumnsFromStructSchema(TopicName topicName ImmutableList.Builder builder = ImmutableList.builder(); - builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>())); + builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>(), handleKeyValueType)); if (withInternalColumns) { PulsarInternalColumn.getInternalFields() @@ -398,6 +405,29 @@ static List getPulsarColumnsFromStructSchema(TopicName topicName } return builder.build(); } + + static List getPulsarColumnsFromKeyValueSchema(TopicName topicName, + SchemaInfo schemaInfo, + boolean withInternalColumns) { + ImmutableList.Builder builder = ImmutableList.builder(); + KeyValue kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); + SchemaInfo keySchemaInfo = kvSchemaInfo.getKey(); + List keyColumnMetadataList = getPulsarColumns(topicName, keySchemaInfo, false, + PulsarColumnHandle.HandleKeyValueType.KEY); + builder.addAll(keyColumnMetadataList); + + SchemaInfo valueSchemaInfo = kvSchemaInfo.getValue(); + List valueColumnMetadataList = getPulsarColumns(topicName, valueSchemaInfo, false, + PulsarColumnHandle.HandleKeyValueType.VALUE); + builder.addAll(valueColumnMetadataList); + + if (withInternalColumns) { + PulsarInternalColumn.getInternalFields() + .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false))); + } + return builder.build(); + } + @VisibleForTesting static Type convertPulsarType(SchemaType pulsarType) { switch (pulsarType) { @@ -435,18 +465,20 @@ static Type convertPulsarType(SchemaType pulsarType) { @VisibleForTesting static List getColumns(String fieldName, Schema fieldSchema, - Set fieldTypes, - Stack fieldNames, - Stack positionIndices) { + Set fieldTypes, + Stack fieldNames, + Stack positionIndices, + PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { List columnMetadataList = new LinkedList<>(); if (isPrimitiveType(fieldSchema.getType())) { - columnMetadataList.add(new PulsarColumnMetadata(fieldName, + columnMetadataList.add(new PulsarColumnMetadata( + PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName), convertType(fieldSchema.getType(), fieldSchema.getLogicalType()), null, null, false, false, fieldNames.toArray(new String[fieldNames.size()]), - positionIndices.toArray(new Integer[positionIndices.size()]))); + positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType)); } else if (fieldSchema.getType() == Schema.Type.UNION) { boolean canBeNull = false; for (Schema type : fieldSchema.getTypes()) { @@ -454,17 +486,19 @@ static List getColumns(String fieldName, Schema fieldSchem PulsarColumnMetadata columnMetadata; if (type.getType() != Schema.Type.NULL) { if (!canBeNull) { - columnMetadata = new PulsarColumnMetadata(fieldName, + columnMetadata = new PulsarColumnMetadata( + PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName), convertType(type.getType(), type.getLogicalType()), null, null, false, false, fieldNames.toArray(new String[fieldNames.size()]), - positionIndices.toArray(new Integer[positionIndices.size()])); + positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType); } else { - columnMetadata = new PulsarColumnMetadata(fieldName, + columnMetadata = new PulsarColumnMetadata( + PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName), convertType(type.getType(), type.getLogicalType()), "field can be null", null, false, false, fieldNames.toArray(new String[fieldNames.size()]), - positionIndices.toArray(new Integer[positionIndices.size()])); + positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType); } columnMetadataList.add(columnMetadata); } else { @@ -472,7 +506,7 @@ static List getColumns(String fieldName, Schema fieldSchem } } else { List columns = getColumns(fieldName, type, fieldTypes, fieldNames, - positionIndices); + positionIndices, handleKeyValueType); columnMetadataList.addAll(columns); } } @@ -488,10 +522,11 @@ static List getColumns(String fieldName, Schema fieldSchem positionIndices.push(i); List columns; if (fieldName == null) { - columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices); + columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices, + handleKeyValueType); } else { columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), - fieldTypes, fieldNames, positionIndices); + fieldTypes, fieldNames, positionIndices, handleKeyValueType); } positionIndices.pop(); @@ -507,11 +542,12 @@ static List getColumns(String fieldName, Schema fieldSchem } else if (fieldSchema.getType() == Schema.Type.MAP) { } else if (fieldSchema.getType() == Schema.Type.ENUM) { - PulsarColumnMetadata columnMetadata = new PulsarColumnMetadata(fieldName, + PulsarColumnMetadata columnMetadata = new PulsarColumnMetadata( + PulsarColumnMetadata.getColumnName(handleKeyValueType, fieldName), convertType(fieldSchema.getType(), fieldSchema.getLogicalType()), null, null, false, false, fieldNames.toArray(new String[fieldNames.size()]), - positionIndices.toArray(new Integer[positionIndices.size()])); + positionIndices.toArray(new Integer[positionIndices.size()]), handleKeyValueType); columnMetadataList.add(columnMetadata); } else if (fieldSchema.getType() == Schema.Type.FIXED) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java index 28980a9f13be4..f26587bfcbf80 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java @@ -56,6 +56,11 @@ public Object deserialize(ByteBuf byteBuf) { } } + @Override + public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) { + return null; + } + @Override public Object extractField(int index, Object currentRecord) { return currentRecord; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index eca99af4dbb4b..6899f97bbd1c2 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -40,6 +40,7 @@ import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.List; import java.util.Map; @@ -418,7 +419,15 @@ public boolean advanceNextPosition() { //start time for deseralizing record metricsTracker.start_RECORD_DESERIALIZE_TIME(); - currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData()); + if (this.schemaHandler instanceof KeyValueSchemaHandler) { + ByteBuf keyByteBuf = null; + if (this.currentMessage.getKeyBytes().isPresent()) { + keyByteBuf = this.currentMessage.getKeyBytes().get(); + } + currentRecord = this.schemaHandler.deserialize(keyByteBuf, this.currentMessage.getData()); + } else { + currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData()); + } metricsTracker.incr_NUM_RECORD_DESERIALIZED(); // stats for time spend deserializing diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java index c0282df14007e..3b091ad74c77a 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java @@ -23,9 +23,9 @@ import com.facebook.presto.spi.PrestoException; import java.util.List; - import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; class PulsarSchemaHandlers { @@ -45,6 +45,8 @@ static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo, throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType()); } + } else if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) { + return new KeyValueSchemaHandler(schemaInfo, columnHandles); } else { throw new PrestoException( NOT_SUPPORTED, diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index dbebbf54a3eba..9893d8ffab0ea 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -26,7 +26,10 @@ import com.facebook.presto.spi.predicate.TupleDomain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import io.airlift.log.Logger; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -39,9 +42,12 @@ */ public class PulsarSplit implements ConnectorSplit { + private static final Logger log = Logger.get(PulsarSplit.class); + private final long splitId; private final String connectorId; private final String schemaName; + private final String originSchemaName; private final String tableName; private final long splitSize; private final String schema; @@ -55,6 +61,7 @@ public class PulsarSplit implements ConnectorSplit { private final PositionImpl startPosition; private final PositionImpl endPosition; + private final String schemaInfoProperties; private final OffloadPolicies offloadPolicies; @@ -63,6 +70,7 @@ public PulsarSplit( @JsonProperty("splitId") long splitId, @JsonProperty("connectorId") String connectorId, @JsonProperty("schemaName") String schemaName, + @JsonProperty("originSchemaName") String originSchemaName, @JsonProperty("tableName") String tableName, @JsonProperty("splitSize") long splitSize, @JsonProperty("schema") String schema, @@ -72,16 +80,11 @@ public PulsarSplit( @JsonProperty("startPositionLedgerId") long startPositionLedgerId, @JsonProperty("endPositionLedgerId") long endPositionLedgerId, @JsonProperty("tupleDomain") TupleDomain tupleDomain, - @JsonProperty("properties") Map schemaInfoProperties, - @JsonProperty("offloadPolicies") OffloadPolicies offloadPolicies) { + @JsonProperty("schemaInfoProperties") String schemaInfoProperties, + @JsonProperty("offloadPolicies") OffloadPolicies offloadPolicies) throws IOException { this.splitId = splitId; requireNonNull(schemaName, "schema name is null"); - this.schemaInfo = SchemaInfo.builder() - .type(schemaType) - .name(schemaName) - .schema(schema.getBytes()) - .properties(schemaInfoProperties) - .build(); + this.originSchemaName = originSchemaName; this.schemaName = requireNonNull(schemaName, "schema name is null"); this.connectorId = requireNonNull(connectorId, "connector id is null"); this.tableName = requireNonNull(tableName, "table name is null"); @@ -95,7 +98,16 @@ public PulsarSplit( this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null"); this.startPosition = PositionImpl.get(startPositionLedgerId, startPositionEntryId); this.endPosition = PositionImpl.get(endPositionLedgerId, endPositionEntryId); + this.schemaInfoProperties = schemaInfoProperties; this.offloadPolicies = offloadPolicies; + + ObjectMapper objectMapper = new ObjectMapper(); + this.schemaInfo = SchemaInfo.builder() + .name(originSchemaName) + .type(schemaType) + .schema(schema.getBytes("ISO8859-1")) + .properties(objectMapper.readValue(schemaInfoProperties, Map.class)) + .build(); } @JsonProperty @@ -128,6 +140,11 @@ public long getSplitSize() { return splitSize; } + @JsonProperty + public String getOriginSchemaName() { + return originSchemaName; + } + @JsonProperty public String getSchema() { return schema; @@ -166,6 +183,11 @@ public PositionImpl getEndPosition() { return endPosition; } + @JsonProperty + public String getSchemaInfoProperties() { + return schemaInfoProperties; + } + @JsonProperty public OffloadPolicies getOffloadPolicies() { return offloadPolicies; @@ -191,6 +213,7 @@ public String toString() { return "PulsarSplit{" + "splitId=" + splitId + ", connectorId='" + connectorId + '\'' + + ", originSchemaName='" + originSchemaName + '\'' + ", schemaName='" + schemaName + '\'' + ", tableName='" + tableName + '\'' + ", splitSize=" + splitSize @@ -200,6 +223,7 @@ public String toString() { + ", endPositionEntryId=" + endPositionEntryId + ", startPositionLedgerId=" + startPositionLedgerId + ", endPositionLedgerId=" + endPositionLedgerId + + ", schemaInfoProperties=" + schemaInfoProperties + (offloadPolicies == null ? "" : offloadPolicies.toString()) + '}'; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 848b9ccd1789c..cce071ae15f12 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -35,9 +35,11 @@ import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.TupleDomain; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import io.airlift.log.Logger; +import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; @@ -75,6 +77,8 @@ public class PulsarSplitManager implements ConnectorSplitManager { private static final Logger log = Logger.get(PulsarSplitManager.class); + private ObjectMapper objectMapper = new ObjectMapper(); + @Inject public PulsarSplitManager(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); @@ -250,7 +254,7 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, SchemaInfo schemaInfo, String tableName, TupleDomain tupleDomain, OffloadPolicies offloadPolicies) - throws ManagedLedgerException, InterruptedException { + throws ManagedLedgerException, InterruptedException, IOException { ReadOnlyCursor readOnlyCursor = null; try { @@ -295,19 +299,21 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, readOnlyCursor.skipEntries(Math.toIntExact(entriesForSplit)); PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition(); - splits.add(new PulsarSplit(i, this.connectorId, + PulsarSplit pulsarSplit = new PulsarSplit(i, this.connectorId, restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig), + schemaInfo.getName(), tableName, entriesForSplit, - new String(schemaInfo.getSchema()), + new String(schemaInfo.getSchema(), "ISO8859-1"), schemaInfo.getType(), startPosition.getEntryId(), endPosition.getEntryId(), startPosition.getLedgerId(), endPosition.getLedgerId(), tupleDomain, - schemaInfo.getProperties(), - offloadPolicies)); + objectMapper.writeValueAsString(schemaInfo.getProperties()), + offloadPolicies); + splits.add(pulsarSplit); } return splits; } finally { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java index 37fce04329555..7529034accea6 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java @@ -27,6 +27,8 @@ public interface SchemaHandler { Object deserialize(ByteBuf payload); + Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload); + Object extractField(int index, Object currentRecord); } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 6c911a407541a..adeababe481e8 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -26,6 +26,7 @@ import com.facebook.presto.spi.type.RealType; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.VarcharType; +import com.fasterxml.jackson.databind.ObjectMapper; import io.airlift.log.Logger; import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -118,6 +119,7 @@ public abstract class TestPulsarConnector { protected static Map topicsToSchemas; protected static Map topicsToNumEntries; + private final static ObjectMapper objectMapper = new ObjectMapper(); protected static final NamespaceName NAMESPACE_NAME_1 = NamespaceName.get("tenant-1", "ns-1"); protected static final NamespaceName NAMESPACE_NAME_2 = NamespaceName.get("tenant-1", "ns-2"); @@ -299,7 +301,7 @@ public static class Boo { false, false, fooFieldNames.get("field1"), - fooPositionIndices.get("field1"))); + fooPositionIndices.get("field1"), null)); String[] fieldNames2 = {"field2"}; @@ -312,7 +314,7 @@ public static class Boo { false, false, fieldNames2, - positionIndices2)); + positionIndices2, null)); String[] fieldNames3 = {"field3"}; Integer[] positionIndices3 = {2}; @@ -324,7 +326,7 @@ public static class Boo { false, false, fieldNames3, - positionIndices3)); + positionIndices3, null)); String[] fieldNames4 = {"field4"}; Integer[] positionIndices4 = {3}; @@ -336,7 +338,7 @@ public static class Boo { false, false, fieldNames4, - positionIndices4)); + positionIndices4, null)); String[] fieldNames5 = {"field5"}; @@ -349,7 +351,7 @@ public static class Boo { false, false, fieldNames5, - positionIndices5)); + positionIndices5, null)); String[] fieldNames6 = {"field6"}; Integer[] positionIndices6 = {5}; @@ -361,7 +363,7 @@ public static class Boo { false, false, fieldNames6, - positionIndices6)); + positionIndices6, null)); String[] fieldNames7 = {"timestamp"}; Integer[] positionIndices7 = {6}; @@ -373,7 +375,7 @@ public static class Boo { false, false, fieldNames7, - positionIndices7)); + positionIndices7, null)); String[] fieldNames8 = {"time"}; Integer[] positionIndices8 = {7}; @@ -385,7 +387,7 @@ public static class Boo { false, false, fieldNames8, - positionIndices8)); + positionIndices8, null)); String[] fieldNames9 = {"date"}; Integer[] positionIndices9 = {8}; @@ -397,7 +399,7 @@ public static class Boo { false, false, fieldNames9, - positionIndices9)); + positionIndices9, null)); String[] bar_fieldNames1 = {"bar", "field1"}; Integer[] bar_positionIndices1 = {9, 0}; @@ -409,7 +411,7 @@ public static class Boo { false, false, bar_fieldNames1, - bar_positionIndices1)); + bar_positionIndices1, null)); String[] bar_fieldNames2 = {"bar", "field2"}; Integer[] bar_positionIndices2 = {9, 1}; @@ -421,7 +423,7 @@ public static class Boo { false, false, bar_fieldNames2, - bar_positionIndices2)); + bar_positionIndices2, null)); String[] bar_test_fieldNames4 = {"bar", "test", "field4"}; Integer[] bar_test_positionIndices4 = {9, 2, 0}; @@ -433,7 +435,7 @@ public static class Boo { false, false, bar_test_fieldNames4, - bar_test_positionIndices4)); + bar_test_positionIndices4, null)); String[] bar_test_fieldNames5 = {"bar", "test", "field5"}; Integer[] bar_test_positionIndices5 = {9, 2, 1}; @@ -445,7 +447,7 @@ public static class Boo { false, false, bar_test_fieldNames5, - bar_test_positionIndices5)); + bar_test_positionIndices5, null)); String[] bar_test_fieldNames6 = {"bar", "test", "field6"}; Integer[] bar_test_positionIndices6 = {9, 2, 2}; @@ -457,7 +459,7 @@ public static class Boo { false, false, bar_test_fieldNames6, - bar_test_positionIndices6)); + bar_test_positionIndices6, null)); String[] bar_test_foobar_fieldNames1 = {"bar", "test", "foobar", "field1"}; Integer[] bar_test_foobar_positionIndices1 = {9, 2, 6, 0}; @@ -469,7 +471,7 @@ public static class Boo { false, false, bar_test_foobar_fieldNames1, - bar_test_foobar_positionIndices1)); + bar_test_foobar_positionIndices1, null)); String[] bar_field3 = {"bar", "field3"}; Integer[] bar_positionIndices3 = {9, 3}; @@ -481,7 +483,7 @@ public static class Boo { false, false, bar_field3, - bar_positionIndices3)); + bar_positionIndices3, null)); String[] bar_test2_fieldNames4 = {"bar", "test2", "field4"}; Integer[] bar_test2_positionIndices4 = {9, 4, 0}; @@ -493,7 +495,7 @@ public static class Boo { false, false, bar_test2_fieldNames4, - bar_test2_positionIndices4)); + bar_test2_positionIndices4, null)); String[] bar_test2_fieldNames5 = {"bar", "test2", "field5"}; Integer[] bar_test2_positionIndices5 = {9, 4, 1}; @@ -505,7 +507,7 @@ public static class Boo { false, false, bar_test2_fieldNames5, - bar_test2_positionIndices5)); + bar_test2_positionIndices5, null)); String[] bar_test2_fieldNames6 = {"bar", "test2", "field6"}; Integer[] bar_test2_positionIndices6 = {9, 4, 2}; @@ -517,7 +519,7 @@ public static class Boo { false, false, bar_test2_fieldNames6, - bar_test2_positionIndices6)); + bar_test2_positionIndices6, null)); String[] bar_test2_foobar_fieldNames1 = {"bar", "test2", "foobar", "field1"}; Integer[] bar_test2_foobar_positionIndices1 = {9, 4, 6, 0}; @@ -529,7 +531,7 @@ public static class Boo { false, false, bar_test2_foobar_fieldNames1, - bar_test2_foobar_positionIndices1)); + bar_test2_foobar_positionIndices1, null)); String[] fieldNames10 = {"field7"}; Integer[] positionIndices10 = {10}; @@ -541,7 +543,7 @@ public static class Boo { false, false, fieldNames10, - positionIndices10)); + positionIndices10, null)); fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream() .map(pulsarInternalColumn -> pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false)) @@ -556,12 +558,14 @@ public static class Boo { for (TopicName topicName : allTopics) { if (topicsToSchemas.containsKey(topicName.getSchemaName())) { splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(), - topicName.getNamespace(), topicName.getLocalName(), + topicName.getNamespace(), topicName.getLocalName(), topicName.getLocalName(), topicsToNumEntries.get(topicName.getSchemaName()), new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()), topicsToSchemas.get(topicName.getSchemaName()).getType(), 0, topicsToNumEntries.get(topicName.getSchemaName()), - 0, 0, TupleDomain.all(), new HashMap<>(), null)); + 0, 0, TupleDomain.all(), + objectMapper.writeValueAsString( + topicsToSchemas.get(topicName.getSchemaName()).getProperties()), null)); } } @@ -601,7 +605,6 @@ public static class Boo { } catch (Throwable e) { System.out.println("Error: " + e); System.out.println("Stacktrace: " + Arrays.asList(e.getStackTrace())); - throw e; } } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java new file mode 100644 index 0000000000000..193f68e00b069 --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; +import org.apache.pulsar.common.api.raw.RawMessage; +import org.apache.pulsar.common.api.raw.RawMessageImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Unit test for KeyValueSchemaHandler + */ +@Slf4j +public class TestPulsarKeyValueSchemaHandler { + + private final static ObjectMapper objectMapper = new ObjectMapper(); + + private Schema> schema1 = + Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE); + + private Schema> schema2 = + Schema.KeyValue(Schema.STRING, Schema.JSON(Foo.class), KeyValueEncodingType.INLINE); + + private Schema> schema3 = + Schema.KeyValue(Schema.AVRO(Boo.class), Schema.INT64, KeyValueEncodingType.SEPARATED); + + private Schema> schema4 = + Schema.KeyValue(Schema.JSON(Boo.class), Schema.AVRO(Foo.class), KeyValueEncodingType.SEPARATED); + + private final static TopicName topicName = TopicName.get("persistent://public/default/kv-test"); + + private final static Foo foo; + + private final static Boo boo; + + private final Integer KEY_FIELD_NAME_PREFIX_LENGTH = PulsarColumnMetadata.KEY_SCHEMA_COLUMN_PREFIX.length(); + + static { + foo = new Foo(); + foo.field1 = "field1-value"; + foo.field2 = 20; + + boo = new Boo(); + boo.field1 = "field1-value"; + boo.field2 = true; + boo.field3 = 10.2; + } + + + @Test + public void testSchema1() throws IOException { + final String keyData = "test-key"; + final Integer valueData = 10; + List columnMetadataList = + PulsarMetadata.getPulsarColumns(topicName, schema1.getSchemaInfo(), + true, null); + int keyCount = 0; + int valueCount = 0; + for (ColumnMetadata columnMetadata : columnMetadataList) { + PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; + if (pulsarColumnMetadata.isKey()) { + keyCount++; + } else if (pulsarColumnMetadata.isValue()) { + valueCount++; + } + } + Assert.assertEquals(keyCount, 1); + Assert.assertEquals(valueCount, 1); + + List columnHandleList = getColumnHandlerList(columnMetadataList); + + KeyValueSchemaHandler keyValueSchemaHandler = + new KeyValueSchemaHandler(schema1.getSchemaInfo(), columnHandleList); + + RawMessageImpl message = Mockito.mock(RawMessageImpl.class); + Mockito.when(message.getData()).thenReturn( + Unpooled.wrappedBuffer(schema1.encode(new KeyValue<>(keyData, valueData))) + ); + + KeyValue byteBufKeyValue = getKeyValueByteBuf(message, schema1); + Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue()); + Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData); + Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), valueData); + } + + @Test + public void testSchema2() throws IOException { + final String keyData = "test-key"; + + List columnMetadataList = + PulsarMetadata.getPulsarColumns(topicName, schema2.getSchemaInfo(), + true, null); + int keyCount = 0; + int valueCount = 0; + for (ColumnMetadata columnMetadata : columnMetadataList) { + PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; + if (pulsarColumnMetadata.isKey()) { + keyCount++; + } else if (pulsarColumnMetadata.isValue()) { + valueCount++; + } + } + Assert.assertEquals(keyCount, 1); + Assert.assertEquals(valueCount, 2); + + List columnHandleList = getColumnHandlerList(columnMetadataList); + + RawMessage message = Mockito.mock(RawMessage.class); + Mockito.when(message.getData()).thenReturn( + Unpooled.wrappedBuffer(schema2.encode(new KeyValue<>(keyData, foo))) + ); + + + KeyValueSchemaHandler keyValueSchemaHandler = + new KeyValueSchemaHandler(schema2.getSchemaInfo(), columnHandleList); + + KeyValue byteBufKeyValue = getKeyValueByteBuf(message, schema2); + Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue()); + Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData); + Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), + foo.getValue(columnHandleList.get(1).getName())); + Assert.assertEquals(keyValueSchemaHandler.extractField(2, object), + foo.getValue(columnHandleList.get(2).getName())); + } + + @Test + public void testSchema3() throws IOException { + final Boo boo = new Boo(); + boo.field1 = "field1-value"; + boo.field2 = true; + boo.field3 = 10.2; + final Long valueData = 999999L; + + List columnMetadataList = + PulsarMetadata.getPulsarColumns(topicName, schema3.getSchemaInfo(), + true, null); + int keyCount = 0; + int valueCount = 0; + for (ColumnMetadata columnMetadata : columnMetadataList) { + PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; + if (pulsarColumnMetadata.isKey()) { + keyCount++; + } else if (pulsarColumnMetadata.isValue()) { + valueCount++; + } + } + Assert.assertEquals(keyCount, 3); + Assert.assertEquals(valueCount, 1); + + List columnHandleList = getColumnHandlerList(columnMetadataList); + + KeyValueSchemaHandler keyValueSchemaHandler = + new KeyValueSchemaHandler(schema3.getSchemaInfo(), columnHandleList); + + RawMessage message = Mockito.mock(RawMessage.class); + Mockito.when(message.getKeyBytes()).thenReturn( + Optional.of(Unpooled.wrappedBuffer( + ((KeyValueSchema) schema3).getKeySchema().encode(boo) + )) + ); + Mockito.when(message.getData()).thenReturn( + Unpooled.wrappedBuffer(schema3.encode(new KeyValue<>(boo, valueData))) + ); + + KeyValue byteBufKeyValue = getKeyValueByteBuf(message, schema3); + Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue()); + Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(), + boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), + boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(2, object), + boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(3, object), valueData); + } + + @Test + public void testSchema4() throws IOException { + List columnMetadataList = + PulsarMetadata.getPulsarColumns(topicName, schema4.getSchemaInfo(), + true, null); + int keyCount = 0; + int valueCount = 0; + for (ColumnMetadata columnMetadata : columnMetadataList) { + PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; + if (pulsarColumnMetadata.isKey()) { + keyCount++; + } else if (pulsarColumnMetadata.isValue()) { + valueCount++; + } + } + Assert.assertEquals(keyCount, 3); + Assert.assertEquals(valueCount, 2); + + List columnHandleList = getColumnHandlerList(columnMetadataList); + + KeyValueSchemaHandler keyValueSchemaHandler = + new KeyValueSchemaHandler(schema4.getSchemaInfo(), columnHandleList); + + RawMessage message = Mockito.mock(RawMessage.class); + Mockito.when(message.getKeyBytes()).thenReturn( + Optional.of(Unpooled.wrappedBuffer( + ((KeyValueSchema) schema4).getKeySchema().encode(boo) + )) + ); + Mockito.when(message.getData()).thenReturn( + Unpooled.wrappedBuffer(schema4.encode(new KeyValue<>(boo, foo))) + ); + + KeyValue byteBufKeyValue = getKeyValueByteBuf(message, schema4); + Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue()); + Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(), + boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), + boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(2, object), + boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH))); + Assert.assertEquals(keyValueSchemaHandler.extractField(3, object).toString(), + foo.getValue(columnHandleList.get(3).getName())); + Assert.assertEquals(keyValueSchemaHandler.extractField(4, object).toString(), + foo.getValue(columnHandleList.get(4).getName()) + ""); + } + + private List getColumnHandlerList(List columnMetadataList) { + List columnHandleList = new LinkedList<>(); + + columnMetadataList.forEach(columnMetadata -> { + PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; + PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle( + "connectorId", + pulsarColumnMetadata.getNameWithCase(), + pulsarColumnMetadata.getType(), + pulsarColumnMetadata.isHidden(), + pulsarColumnMetadata.isInternal(), + pulsarColumnMetadata.getFieldNames(), + pulsarColumnMetadata.getPositionIndices(), + pulsarColumnMetadata.getHandleKeyValueType()); + columnHandleList.add(pulsarColumnHandle); + }); + + return columnHandleList; + } + + public KeyValue getKeyValueByteBuf(RawMessage message, Schema schema) { + KeyValueEncodingType encodingType = KeyValueSchemaInfo.decodeKeyValueEncodingType(schema.getSchemaInfo()); + ByteBuf keyByteBuf = null; + if (Objects.equals(KeyValueEncodingType.SEPARATED, encodingType)) { + if (message.getKeyBytes().isPresent()) { + keyByteBuf = message.getKeyBytes().get(); + } else { + keyByteBuf = null; + } + } else { + keyByteBuf = null; + } + return new KeyValue<>(keyByteBuf, Unpooled.wrappedBuffer(message.getData())); + } + + @Data + static class Foo { + private String field1; + private Integer field2; + + public Object getValue(String fieldName) { + switch (fieldName) { + case "field1": + return field1; + case "field2": + return field2 == null ? null : new Long(field2); + default: + return null; + } + } + } + + @Data + static class Boo { + private String field1; + private Boolean field2; + private Double field3; + + public Object getValue(String fieldName) { + switch (fieldName) { + case "field1": + return field1; + case "field2": + return field2; + case "field3": + return field3 == null ? null : field3.doubleValue(); + default: + return null; + } + } + + } + +} diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java index 87e148db84f1c..ac2e1f4bd3815 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java @@ -153,7 +153,7 @@ public void testNewPulsarPrimitiveSchemaHandler() { @Test public void testNewColumnMetadata() { List columnMetadataList = PulsarMetadata.getPulsarColumns(stringTopicName, - StringSchema.utf8().getSchemaInfo(), false); + StringSchema.utf8().getSchemaInfo(), false, null); Assert.assertEquals(columnMetadataList.size(), 1); ColumnMetadata columnMetadata = columnMetadataList.get(0); Assert.assertEquals("__value__", columnMetadata.getName());