Skip to content

Commit

Permalink
issue#3838 : Allow incompatible schemas to co-exist on a topic, allow…
Browse files Browse the repository at this point in the history
… set-schema-autoupdate-strategy = none (apache#3840)

## Problem:

Fixes apache#3838 : The set-schema-autoupdate-strategy policy currently allows disabling schema update (--disabled) and one of --compatibility(FULL, BACKWARD, FORWARD). There is no way to currently allow multiple schemas for a topic.

## Solution:
It would be useful to allow --compatibility so the schemas can be added without checking with the previous schemas. This allows for extra flexibility for instance having multiple objects on a topic, supports use cases like a simple event bus etc.

### Modifications

This change added tests and can be verified as follows:

  - *Added test cases under SchemaUpdateStrategyTest*
  • Loading branch information
shiv4289 authored and sijie committed Mar 19, 2019
1 parent 614a98d commit e29bfd0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.avro.Schema;
import org.apache.avro.SchemaValidator;

/**
* A schema validator that always reports as compatible.
*/
class AlwaysSchemaValidator implements SchemaValidator {
static AlwaysSchemaValidator INSTANCE = new AlwaysSchemaValidator();

@Override
public void validate(Schema toValidate, Iterable<Schema> existing) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compati
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
case ALWAYS_COMPATIBLE:
return AlwaysSchemaValidator.INSTANCE;
default:
return NeverSchemaValidator.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public enum SchemaCompatibilityStrategy {
*/
ALWAYS_INCOMPATIBLE,

/**
* Always compatible
*/
ALWAYS_COMPATIBLE,

/**
* Messages written by a new schema can be read by an old schema
*/
Expand All @@ -52,6 +57,8 @@ public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateC
return FORWARD;
case Full:
return FULL;
case AlwaysCompatible:
return ALWAYS_COMPATIBLE;
case AutoUpdateDisabled:
default:
return ALWAYS_INCOMPATIBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,8 @@ void run() throws PulsarAdminException {
strategy = SchemaAutoUpdateCompatibilityStrategy.Backward;
} else if (strategyStr.equals("FORWARD")) {
strategy = SchemaAutoUpdateCompatibilityStrategy.Forward;
} else if (strategyStr.equals("NONE")) {
strategy = SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible;
} else {
throw new PulsarAdminException("Either --compatibility or --disabled must be specified");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ public enum SchemaAutoUpdateCompatibilityStrategy {
/**
* Backward and Forward.
*/
Full
Full,

/**
* Always Compatible - The new schema will not be checked for compatibility against
* old schemas. In other words, new schemas will always be marked assumed compatible.
*/
AlwaysCompatible
}
2 changes: 1 addition & 1 deletion site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ $ pulsar-admin namespaces set-schema-autoupdate-strategy tenant/namespace option
Options
|Flag|Description|Default|
|----|---|---|
|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward).||
|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward, None).||
|`-d`, `--disabled`|Disable automatic schema updates.|false|


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
"namespaces", "get-schema-autoupdate-strategy", namespace);
Assert.assertEquals(result.getStdout().trim(), "FULL");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
"--compatibility", "BACKWARD", namespace);
"--compatibility", "BACKWARD", namespace);

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
Expand All @@ -76,6 +76,31 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
}
}

private void testNone(String namespace, String topicName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-schema-autoupdate-strategy", namespace);
Assert.assertEquals(result.getStdout().trim(), "FULL");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
"--compatibility", "NONE", namespace);

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
p.send(new V1Data("test1", 1));
}

log.info("try with forward compat, should succeed");
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
p.send(new V3Data("test3", 1, 2));
}

log.info("try with backward compat, should succeed");
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
p.send(new V2Data("test2"));
}
}
}

private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-schema-autoupdate-strategy", namespace);
Expand All @@ -101,6 +126,11 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
p.send(new V3Data("test2", 1, 2));
}

log.info("try with fully compat, should succeed");
try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
p.send(new V4Data("test2", 1, (short)100));
}
}

}
Expand Down Expand Up @@ -280,6 +310,17 @@ public void testFullV2() throws Exception {
testAutoUpdateFull("public/full-np-v2", "non-persistent://public/full-np-v2/topic1");
}

@Test
public void testNoneV2() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
pulsarCluster.getClusterName(), "public/none-p-v2");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
pulsarCluster.getClusterName(), "public/none-np-v2");

testNone("public/none-p-v2", "persistent://public/none-p-v2/topic1");
testNone("public/none-np-v2", "non-persistent://public/none-np-v2/topic1");
}

@Test
public void testDisabledV2() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
Expand Down Expand Up @@ -327,6 +368,18 @@ public void testFullV1() throws Exception {
"persistent://public/" + pulsarCluster.getClusterName() + "/full-np-v1/topic1");
}

@Test
public void testNoneV1() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
"public/" + pulsarCluster.getClusterName() + "/none-p-v1");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
"public/" + pulsarCluster.getClusterName() + "/none-np-v1");
testNone("public/" + pulsarCluster.getClusterName() + "/none-p-v1",
"persistent://public/" + pulsarCluster.getClusterName() + "/none-p-v1/topic1");
testNone("public/" + pulsarCluster.getClusterName() + "/none-np-v1",
"persistent://public/" + pulsarCluster.getClusterName() + "/none-np-v1/topic1");
}

@Test
public void testDisabledV1() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
Expand Down

0 comments on commit e29bfd0

Please sign in to comment.