Skip to content

Commit

Permalink
[PIP-44] Separate schema compatibility checker for producer and consu…
Browse files Browse the repository at this point in the history
…mer (apache#5227)

* Change import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;

* Schema compatibility check change

* Add the test

* modify the cmdNamespaces

* Modify the variable name

* Delete the configuration

* Modify the variable name

* modify the method

* Fix some tests

* add the integration test

* Modify the test

* modity the Timeout time

* Modify the test's sequence

* modify the exception message

* Add the test

* modify the httpLookupService

* Add the license

* add the method

* Modify decode message throw exception

* Add the import class

* fix some comment

* Modify the implments method

* add the judgement for consumer schema compatibility check

* add the import
  • Loading branch information
congbobo184 authored and sijie committed Nov 4, 2019
1 parent 7f513e5 commit f6701f1
Show file tree
Hide file tree
Showing 40 changed files with 1,112 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -2034,11 +2036,18 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
}
}

@Deprecated
protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}

protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).schema_compatibility_strategy;
}

@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
Expand All @@ -2050,6 +2059,17 @@ protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdate
"schemaAutoUpdateCompatibilityStrategy");
}

protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

mutatePolicy((policies) -> {
policies.schema_compatibility_strategy = strategy;
return policies;
}, (policies) -> policies.schema_compatibility_strategy,
"schemaCompatibilityStrategy");
}

protected boolean internalGetSchemaValidationEnforced() {
validateSuperUserAccess();
validateAdminAccessForTenant(namespaceName.getTenant());
Expand All @@ -2067,6 +2087,23 @@ protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnfor
"schemaValidationEnforced");
}

protected boolean internalGetIsAllowAutoUpdateSchema() {
validateSuperUserAccess();
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
}

protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

mutatePolicy((policies) -> {
policies.is_allow_auto_update_schema = isAllowAutoUpdateSchema;
return policies;
}, (policies) -> policies.is_allow_auto_update_schema,
"isAllowAutoUpdateSchema");
}


private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
Function<Policies, T> getter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -965,6 +967,59 @@ public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String
internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
}

@GET
@Path("/{tenant}/{namespace}/schemaCompatibilityStrategy")
@ApiOperation(value = "The strategy of the namespace schema compatibility ")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetSchemaCompatibilityStrategy();
}

@PUT
@Path("/{tenant}/{namespace}/schemaCompatibilityStrategy")
@ApiOperation(value = "Update the strategy used to check the compatibility of new schema")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setSchemaCompatibilityStrategy(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
SchemaCompatibilityStrategy strategy) {
validateNamespaceName(tenant, namespace);
internalSetSchemaCompatibilityStrategy(strategy);
}

@GET
@Path("/{tenant}/{namespace}/isAllowAutoUpdateSchema")
@ApiOperation(value = "The flag of whether allow auto update schema")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public boolean getIsAllowAutoUpdateSchema(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetIsAllowAutoUpdateSchema();
}

@POST
@Path("/{tenant}/{namespace}/isAllowAutoUpdateSchema")
@ApiOperation(value = "Update flag of whether allow auto update schema")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setIsAllowAutoUpdateSchema(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
boolean isAllowAutoUpdateSchema) {
validateNamespaceName(tenant, namespace);
internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema);
}


@GET
@Path("/{tenant}/{namespace}/schemaValidationEnforced")
@ApiOperation(value = "Get schema validation enforced flag for namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -314,34 +315,37 @@ public void postSchema(

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
data = DefaultImplementation
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(tenant, namespace, topic),
SchemaData.builder()
.data(data)
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(),
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
data = DefaultImplementation
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
} else {
data = payload.getSchema().getBytes(Charsets.UTF_8);
}
pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(tenant, namespace, topic),
SchemaData.builder()
.data(data)
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(),
schemaCompatibilityStrategy
).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
).exceptionally(error -> {
if (error.getCause() instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT.getStatusCode(),
Expand Down Expand Up @@ -407,10 +411,15 @@ public void testCompatibility(
validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);

String schemaId = buildSchemaId(tenant, namespace, topic);
Policies policies = getNamespacePolicies(NamespaceName.get(tenant, namespace));

SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(getNamespacePolicies(NamespaceName.get(tenant, namespace))
.schema_auto_update_compatibility_strategy);
SchemaCompatibilityStrategy schemaCompatibilityStrategy;
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}

pulsar().getSchemaRegistryService().isCompatible(schemaId, SchemaData.builder()
.data(payload.getSchema().getBytes(Charsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@

import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,6 +77,7 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isEncryptionRequired = false;
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

Expand Down Expand Up @@ -184,9 +187,13 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {

String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
if (isAllowAutoUpdateSchema) {
return brokerService.pulsar()
.getSchemaRegistryService()
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException("Don't allow auto update schema."));
}
}

@Override
Expand All @@ -205,12 +212,12 @@ public CompletableFuture<SchemaVersion> deleteSchema() {
}

@Override
public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.isCompatible(id, schema, schemaCompatibilityStrategy);
.checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
}

@Override
Expand All @@ -220,6 +227,14 @@ public void recordAddLatency(long latency, TimeUnit unit) {
PUBLISH_LATENCY.observe(latency, unit);
}

protected void setSchemaCompatibilityStrategy (Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
}
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.netty.handler.ssl.SslHandler;

import java.net.SocketAddress;
import java.util.List;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -703,19 +703,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(isCompatible -> {
if (isCompatible) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated);
} else {
return FutureUtil.failedFuture(
new IncompatibleSchemaException(
"Trying to subscribe with incompatible schema"
));
}
});
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated));

} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
/**
* Check if schema is compatible with current topic schema.
*/
CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);
CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema);

/**
* If the topic is idle (no producers, no entries, no subscribers and no existing schema),
* add the passed schema to the topic. Otherwise, check that the passed schema is compatible
* with what the topic already has.
*/
CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema);
CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema);

CompletableFuture<Void> deleteForcefully();

Expand Down
Loading

0 comments on commit f6701f1

Please sign in to comment.