Skip to content

Commit

Permalink
Add schema admin api get schema info with schema version (apache#4877)
Browse files Browse the repository at this point in the history
### Motivation

To fix apache#4854 and support get keyValueSchema
  • Loading branch information
congbobo184 authored and sijie committed Aug 16, 2019
1 parent 0367f5f commit f859da9
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
Expand Down Expand Up @@ -461,11 +463,19 @@ public void getVersionBySchema(
}

private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
String schemaData;
if (schemaAndMetadata.schema.getType() == SchemaType.KEY_VALUE) {
schemaData = DefaultImplementation
.convertKeyValueSchemaInfoDataToString(DefaultImplementation.decodeKeyValueSchemaInfo
(schemaAndMetadata.schema.toSchemaInfo()));
} else {
schemaData = new String(schemaAndMetadata.schema.getData(), UTF_8);
}
return GetSchemaResponse.builder()
.version(getLongSchemaVersion(schemaAndMetadata.version))
.type(schemaAndMetadata.schema.getType())
.timestamp(schemaAndMetadata.schema.getTimestamp())
.data(new String(schemaAndMetadata.schema.getData(), UTF_8))
.data(schemaData)
.properties(schemaAndMetadata.schema.getProps())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -111,6 +112,11 @@ public void testSchemaInfoApi(Schema<?> schema) throws Exception {
testSchemaInfoApi(schema, "schematest/test/test-" + schema.getSchemaInfo().getType());
}

@Test(dataProvider = "schemas")
public void testSchemaInfoWithVersionApi(Schema<?> schema) throws Exception {
testSchemaInfoWithVersionApi(schema, "schematest/test/test-" + schema.getSchemaInfo().getType());
}

private <T> void testSchemaInfoApi(Schema<T> schema,
String topicName) throws Exception {
SchemaInfo si = schema.getSchemaInfo();
Expand All @@ -128,4 +134,24 @@ private <T> void testSchemaInfoApi(Schema<T> schema,
assertEquals(si, readSi);

}

private <T> void testSchemaInfoWithVersionApi(Schema<T> schema,
String topicName) throws Exception {
SchemaInfo si = schema.getSchemaInfo();
admin.schemas().createSchema(topicName, si);
log.info("Upload schema to topic {} : {}", topicName, si);

SchemaInfoWithVersion readSi = admin.schemas().getSchemaInfoWithVersion(topicName);
log.info("Read schema of topic {} : {}", topicName, readSi);

assertEquals(si, readSi.getSchemaInfo());
assertEquals(0, readSi.getVersion());

readSi = admin.schemas().getSchemaInfoWithVersion(topicName + "-partition-0");
log.info("Read schema of topic {} : {}", topicName, readSi);

assertEquals(si, readSi.getSchemaInfo());
assertEquals(0, readSi.getVersion());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;

import java.util.List;

Expand All @@ -41,6 +42,15 @@ public interface Schemas {
*/
SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;

/**
* Retrieve the latest schema with verison of a topic.
*
* @param topic topic name, in fully qualified format
* @return latest schema with version
* @throws PulsarAdminException
*/
SchemaInfoWithVersion getSchemaInfoWithVersion(String topic) throws PulsarAdminException;

/**
* Retrieve the schema of a topic at a given <tt>version</tt>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
Expand All @@ -34,6 +36,8 @@
import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;

import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -58,6 +62,17 @@ public SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException {
}
}

@Override
public SchemaInfoWithVersion getSchemaInfoWithVersion(String topic) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class);
return convertGetSchemaResponseToSchemaInfoWithVersion(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException {
try {
Expand Down Expand Up @@ -187,13 +202,31 @@ private WebTarget compatibilityPath(TopicName topicName) {
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes(UTF_8));
byte[] schema;
if (response.getType() == SchemaType.KEY_VALUE) {
schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(response.getData().getBytes(UTF_8));
} else {
schema = response.getData().getBytes(UTF_8);
}
info.setSchema(schema);
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
}

static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
GetSchemaResponse response) {

return SchemaInfoWithVersion
.builder()
.schemaInfo(convertGetSchemaResponseToSchemaInfo(tn, response))
.version(response.getVersion())
.build();
}




// the util function exists for backward compatibility concern
static String convertSchemaDataToStringLegacy(byte[] schemaData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -360,6 +361,19 @@ public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
).invoke(null, schemaInfo));
}

/**
* Jsonify the schema info with version.
*
* @param schemaInfoWithVersion the schema info with version
* @return the jsonified schema info with version
*/
public static String jsonifySchemaInfoWithVersion(SchemaInfoWithVersion schemaInfoWithVersion) {
return catchExceptions(
() -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
"jsonifySchemaInfoWithVersion", SchemaInfoWithVersion.class
).invoke(null, schemaInfoWithVersion));
}

/**
* Jsonify the key/value schema info.
*
Expand All @@ -373,6 +387,32 @@ public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo>
).invoke(null, kvSchemaInfo));
}

/**
* convert the key/value schema data
*
* @param kvSchemaInfo the key/value schema info
* @return the convert key/value schema data string
*/
public static String convertKeyValueSchemaInfoDataToString(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
return catchExceptions(
() -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
"convertKeyValueSchemaInfoDataToString", KeyValue.class
).invoke(null, kvSchemaInfo));
}

/**
* convert the key/value schema info data json bytes to key/value schema info data bytes
*
* @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
* @return the key/value schema info data bytes
*/
public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) {
return catchExceptions(
() -> (byte[]) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
"convertKeyValueDataStringToSchemaInfoSchema", byte[].class
).invoke(null, keyValueSchemaInfoDataJsonBytes));
}

public static BatcherBuilder newDefaultBatcherBuilder() {
return catchExceptions(
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.schema;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.client.internal.DefaultImplementation;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Builder
public class SchemaInfoWithVersion {

private long version;

private SchemaInfo schemaInfo;

@Override
public String toString(){
return DefaultImplementation.jsonifySchemaInfoWithVersion(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;

@Parameters(commandDescription = "Operations about schemas")
public class CmdSchemas extends CmdBase {
Expand All @@ -52,13 +51,11 @@ private class GetSchema extends CliCommand {
@Override
void run() throws Exception {
String topic = validateTopicName(params);
SchemaInfo schemaInfo;
if (version == null) {
schemaInfo = admin.schemas().getSchemaInfo(topic);
System.out.println(admin.schemas().getSchemaInfoWithVersion(topic));
} else {
schemaInfo = admin.schemas().getSchemaInfo(topic, version);
System.out.println(admin.schemas().getSchemaInfo(topic, version));
}
System.out.println(schemaInfo);
}
}

Expand Down
Loading

0 comments on commit f859da9

Please sign in to comment.