Skip to content

Commit

Permalink
issue#3838 : Allow incompatible schemas to co-exist on a topic, allow…
Browse files Browse the repository at this point in the history
… set-schema-autoupdate-strategy = none (apache#3840)

Fixes apache#3838 : The set-schema-autoupdate-strategy policy currently allows disabling schema update (--disabled) and one of --compatibility(FULL, BACKWARD, FORWARD). There is no way to currently allow multiple schemas for a topic.

It would be useful to allow --compatibility so the schemas can be added without checking with the previous schemas. This allows for extra flexibility for instance having multiple objects on a topic, supports use cases like a simple event bus etc.

This change added tests and can be verified as follows:

  - *Added test cases under SchemaUpdateStrategyTest*
  • Loading branch information
shiv4289 authored and merlimat committed Apr 1, 2019
1 parent 47f85a1 commit 5dd2f47
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.avro.Schema;
import org.apache.avro.SchemaValidator;

/**
* A schema validator that always reports as compatible.
*/
class AlwaysSchemaValidator implements SchemaValidator {
static AlwaysSchemaValidator INSTANCE = new AlwaysSchemaValidator();

@Override
public void validate(Schema toValidate, Iterable<Schema> existing) {
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public enum SchemaCompatibilityStrategy {
*/
ALWAYS_INCOMPATIBLE,

/**
* Always compatible
*/
ALWAYS_COMPATIBLE,

/**
* Messages written by a new schema can be read by an old schema
*/
Expand All @@ -52,6 +57,8 @@ public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateC
return FORWARD;
case Full:
return FULL;
case AlwaysCompatible:
return ALWAYS_COMPATIBLE;
case AutoUpdateDisabled:
default:
return ALWAYS_INCOMPATIBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ void run() throws PulsarAdminException {
strategy = SchemaAutoUpdateCompatibilityStrategy.Backward;
} else if (strategyStr.equals("FORWARD")) {
strategy = SchemaAutoUpdateCompatibilityStrategy.Forward;
} else if (strategyStr.equals("NONE")) {
strategy = SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible;
} else {
throw new PulsarAdminException("Either --compatibility or --disabled must be specified");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ public enum SchemaAutoUpdateCompatibilityStrategy {
/**
* Backward and Forward.
*/
Full
Full,

/**
* Always Compatible - The new schema will not be checked for compatibility against
* old schemas. In other words, new schemas will always be marked assumed compatible.
*/
AlwaysCompatible
}
23 changes: 21 additions & 2 deletions site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,27 @@ Options
|----|---|---|
|`-s`, `--size`|Maximum number of bytes stored in the pulsar cluster for a topic before data will start being automatically offloaded to longterm storage (eg: 10M, 16G, 3T, 100). Negative values disable automatic offload. 0 triggers offloading as soon as possible.|-1|

### `get-schema-autoupdate-strategy`
Get the schema auto-update strategy for a namespace

Usage
```bash
$ pulsar-admin namespaces get-schema-autoupdate-strategy tenant/namespace
```

### `set-schema-autoupdate-strategy`
Set the schema auto-update strategy for a namespace

Usage
```bash
$ pulsar-admin namespaces set-schema-autoupdate-strategy tenant/namespace options
```

Options
|Flag|Description|Default|
|----|---|---|
|`-c`, `--compatibility`|Compatibility level required for new schemas created via a Producer. Possible values (Full, Backward, Forward, None).||
|`-d`, `--disabled`|Disable automatic schema updates.|false|


## `ns-isolation-policy`
Expand Down Expand Up @@ -1919,5 +1940,3 @@ Options
|`-c`, `--classname`|The Java class name||
|`-j`, `--jar`|A path to the JAR file which contains the above Java class||
|`-t`, `--type`|The type of the schema (avro or json)||


Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
"namespaces", "get-schema-autoupdate-strategy", namespace);
Assert.assertEquals(result.getStdout().trim(), "FULL");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
"--compatibility", "BACKWARD", namespace);
"--compatibility", "BACKWARD", namespace);

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
Expand All @@ -76,6 +76,31 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
}
}

private void testNone(String namespace, String topicName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-schema-autoupdate-strategy", namespace);
Assert.assertEquals(result.getStdout().trim(), "FULL");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy",
"--compatibility", "NONE", namespace);

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
p.send(new V1Data("test1", 1));
}

log.info("try with forward compat, should succeed");
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
p.send(new V3Data("test3", 1, 2));
}

log.info("try with backward compat, should succeed");
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
p.send(new V2Data("test2"));
}
}
}

private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-schema-autoupdate-strategy", namespace);
Expand All @@ -101,6 +126,11 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
p.send(new V3Data("test2", 1, 2));
}

log.info("try with fully compat, should succeed");
try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
p.send(new V4Data("test2", 1, (short)100));
}
}

}
Expand Down Expand Up @@ -280,6 +310,17 @@ public void testFullV2() throws Exception {
testAutoUpdateFull("public/full-np-v2", "non-persistent://public/full-np-v2/topic1");
}

@Test
public void testNoneV2() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
pulsarCluster.getClusterName(), "public/none-p-v2");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
pulsarCluster.getClusterName(), "public/none-np-v2");

testNone("public/none-p-v2", "persistent://public/none-p-v2/topic1");
testNone("public/none-np-v2", "non-persistent://public/none-np-v2/topic1");
}

@Test
public void testDisabledV2() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c",
Expand Down Expand Up @@ -327,6 +368,18 @@ public void testFullV1() throws Exception {
"persistent://public/" + pulsarCluster.getClusterName() + "/full-np-v1/topic1");
}

@Test
public void testNoneV1() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
"public/" + pulsarCluster.getClusterName() + "/none-p-v1");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
"public/" + pulsarCluster.getClusterName() + "/none-np-v1");
testNone("public/" + pulsarCluster.getClusterName() + "/none-p-v1",
"persistent://public/" + pulsarCluster.getClusterName() + "/none-p-v1/topic1");
testNone("public/" + pulsarCluster.getClusterName() + "/none-np-v1",
"persistent://public/" + pulsarCluster.getClusterName() + "/none-np-v1/topic1");
}

@Test
public void testDisabledV1() throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create",
Expand Down

0 comments on commit 5dd2f47

Please sign in to comment.