Skip to content

Commit

Permalink
[schema] support uploading key/value schema using Pulsar admin (apach…
Browse files Browse the repository at this point in the history
…e#5000)

### Motivation
To fix apache#4840
  • Loading branch information
congbobo184 authored and sijie committed Aug 27, 2019
1 parent 41430ce commit 588b166
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,41 +316,47 @@ public void postSchema(
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(tenant, namespace, topic),
SchemaData.builder()
.data(payload.getSchema().getBytes(Charsets.UTF_8))
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(),
schemaCompatibilityStrategy

).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
).exceptionally(error -> {
if (error.getCause() instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT).build());
} else if (error instanceof InvalidSchemaDataException) {
response.resume(Response.status(
422, /* Unprocessable Entity */
error.getMessage()
).build());
} else {
response.resume(
Response.serverError().build()
);
}
return null;
});
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
data = DefaultImplementation
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(tenant, namespace, topic),
SchemaData.builder()
.data(data)
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(),
schemaCompatibilityStrategy
).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
).exceptionally(error -> {
if (error.getCause() instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT).build());
} else if (error instanceof InvalidSchemaDataException) {
response.resume(Response.status(
422, /* Unprocessable Entity */
error.getMessage()
).build());
} else {
response.resume(
Response.serverError().build()
);
}
return null;
});
}).exceptionally(error -> {
if (error.getCause() instanceof RestException) {
response.resume(Response.status(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,14 @@ private <T> void testSchemaInfoWithVersionApi(Schema<T> schema,

}

@Test
public void createKeyValueSchema() throws Exception {
String topicName = "schematest/test/test-key-value-schema";
Schema keyValueSchema = Schema.KeyValue(Schema.AVRO(Foo.class), Schema.AVRO(Foo.class));
admin.schemas().createSchema(topicName,
keyValueSchema.getSchemaInfo());
SchemaInfo schemaInfo = admin.schemas().getSchemaInfo(topicName);

assertEquals(schemaInfo, keyValueSchema.getSchemaInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,27 @@ static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(Top


// the util function exists for backward compatibility concern
static String convertSchemaDataToStringLegacy(byte[] schemaData) {
if (null == schemaData) {
static String convertSchemaDataToStringLegacy(SchemaInfo schemaInfo) {
byte[] schemaData = schemaInfo.getSchema();
if (null == schemaInfo.getSchema()) {
return "";
}

if (schemaInfo.getType() == SchemaType.KEY_VALUE) {
return DefaultImplementation.convertKeyValueSchemaInfoDataToString(DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfo));
}

return new String(schemaData, UTF_8);
}

static PostSchemaPayload convertSchemaInfoToPostSchemaPayload(SchemaInfo schemaInfo) {

PostSchemaPayload payload = new PostSchemaPayload();
payload.setType(schemaInfo.getType().name());
payload.setProperties(schemaInfo.getProperties());
// for backward compatibility concern, we convert `bytes` to `string`
// we can consider fixing it in a new version of rest endpoint
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo));
return payload;
}
}

0 comments on commit 588b166

Please sign in to comment.