Skip to content

Commit

Permalink
[pulsar-broker] Support schema rest api for V1 topics (apache#9218)
Browse files Browse the repository at this point in the history
### Motivation
Support schema admin-api for v1 topics.
  • Loading branch information
rdhabalia authored Feb 5, 2021
1 parent 1ff1fcd commit 70d9f00
Show file tree
Hide file tree
Showing 6 changed files with 640 additions and 321 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;

public class SchemasResourceBase extends AdminResource {

private final Clock clock;

public SchemasResourceBase() {
this(Clock.systemUTC());
}

@VisibleForTesting
public SchemasResourceBase(Clock clock) {
super();
this.clock = clock;
}

private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
return -1L;
}
}

private String getSchemaId() {
if (topicName.isPartitioned()) {
return TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
} else {
return topicName.getSchemaName();
}
}

public void getSchema(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public void getSchema(boolean authoritative, String version, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
return null;
});
}

public void getAllSchemas(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
handleGetAllSchemasResponse(response, schema, error);
return null;
});
}

public void deleteSchema(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""))
.handle((version, error) -> {
if (isNull(error)) {
response.resume(Response.ok()
.entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
.build());
} else {
response.resume(error);
}
return null;
});
}

public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
data = DefaultImplementation
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService()
.putSchemaIfAbsent(getSchemaId(),
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(version -> response.resume(
Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
if (error.getCause() instanceof IncompatibleSchemaException) {
response.resume(Response
.status(Response.Status.CONFLICT.getStatusCode(), error.getCause().getMessage())
.build());
} else if (error instanceof InvalidSchemaDataException) {
response.resume(Response.status(422, /* Unprocessable Entity */
error.getMessage()).build());
} else {
response.resume(Response.serverError().build());
}
return null;
});
}).exceptionally(error -> {
if (error.getCause() instanceof RestException) {
// Unprocessable Entity
response.resume(Response
.status(((RestException) error.getCause()).getResponse().getStatus(), error.getMessage())
.build());
} else {
response.resume(Response.serverError().build());
}
return null;
});
}

public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy;
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}

pulsar().getSchemaRegistryService()
.isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build()))
.exceptionally(error -> {
response.resume(Response.serverError().build());
return null;
});
}

public void getVersionBySchema(

PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();

pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build())
.thenAccept(version -> response.resume(Response.accepted()
.entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
response.resume(Response.serverError().build());
return null;
});
}

@Override
protected String domain() {
return "persistent";
}

private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
String schemaData;
if (schemaAndMetadata.schema.getType() == SchemaType.KEY_VALUE) {
schemaData = DefaultImplementation.convertKeyValueSchemaInfoDataToString(
DefaultImplementation.decodeKeyValueSchemaInfo(schemaAndMetadata.schema.toSchemaInfo()));
} else {
schemaData = new String(schemaAndMetadata.schema.getData(), UTF_8);
}
return GetSchemaResponse.builder().version(getLongSchemaVersion(schemaAndMetadata.version))
.type(schemaAndMetadata.schema.getType()).timestamp(schemaAndMetadata.schema.getTimestamp())
.data(schemaData).properties(schemaAndMetadata.schema.getProps()).build();
}

private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
}
} else {
response.resume(error);
}

}

private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
Throwable error) {
if (isNull(error)) {
if (isNull(schemas)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
.entity(GetAllVersionsSchemaResponse.builder()
.getSchemaResponses(schemas.stream()
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
.collect(Collectors.toList()))
.build())
.build());
}
} else {
response.resume(error);
}
}

private void validateDestinationAndAdminOperation(boolean authoritative) {
try {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.NOT_FOUND, "Not Found");
} else {
throw e;
}
}
}
}
Loading

0 comments on commit 70d9f00

Please sign in to comment.