Skip to content

Commit

Permalink
[schema] provide a flag to disable/enable schema validation on broker (
Browse files Browse the repository at this point in the history
…apache#2730)

*Motivation*

We need an upgrade/backward compatibility story for schema enforcement. Especially around:

- `Producers cannot connect without a schema to topics with a schema`

*Changes*

- provide a flag on brokers to enable schema validation (and disabled it by default). this allows a smooth upgrade on brokers,
  otherwise, it will break all non-java producers on topics with schema immediately when upgrade to the new version
  • Loading branch information
sijie authored Oct 7, 2018
1 parent f8e7dfe commit 0da4e4a
Show file tree
Hide file tree
Showing 29 changed files with 228 additions and 97 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,13 @@ exposePublisherStats=true
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

# Enforce schema validation on following cases:
#
# - if a producer without a schema attempts to produce to a topic with schema, the producer will be
# failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.
# if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

### --- Ledger Offloading --- ###

# The directory for all the offloader implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Interval between checks to see if topics with compaction policies need to be compacted
private int brokerServiceCompactionMonitorIntervalInSeconds = 60;

private boolean isSchemaValidationEnforced = false;
private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
Expand Down Expand Up @@ -1658,6 +1659,14 @@ public void setExposeConsumerLevelMetricsInPrometheus(boolean exposeConsumerLeve
this.exposeConsumerLevelMetricsInPrometheus = exposeConsumerLevelMetricsInPrometheus;
}

public boolean isSchemaValidationEnforced() {
return isSchemaValidationEnforced;
}

public void setSchemaValidationEnforced(boolean enforced) {
this.isSchemaValidationEnforced = enforced;
}

public String getSchemaRegistryStorageClassName() {
return schemaRegistryStorageClassName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class ServerCnx extends PulsarHandler {
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;
private final boolean schemaValidationEnforced;

enum State {
Start, Connected, Failed
Expand All @@ -148,6 +149,7 @@ public ServerCnx(PulsarService pulsar) {
.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
}

@Override
Expand Down Expand Up @@ -833,7 +835,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
} else {
schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
if (hasSchema) {
if (hasSchema && schemaValidationEnforced) {
result.completeExceptionally(new IncompatibleSchemaException(
"Producers cannot connect without a schema to topics with a schema"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -648,7 +649,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));

// Force topic creation and namespace being loaded
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/my-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -685,7 +686,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
}

// Force topic creation and namespace being loaded
producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");

Expand Down Expand Up @@ -836,7 +837,7 @@ public void partitionedTopics(String topicName) throws Exception {
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));

Producer<byte[]> producer = client.newProducer()
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
Expand Down Expand Up @@ -894,7 +895,7 @@ public void partitionedTopics(String topicName) throws Exception {
} catch (ConflictException ce) {
}

producer = client.newProducer()
producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -957,7 +958,7 @@ public void testNamespaceSplitBundle() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -987,7 +988,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1084,7 +1085,7 @@ public void testNamespaceUnloadBundle() throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1146,7 +1147,7 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1203,7 +1204,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
.subscribe();

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand All @@ -1216,7 +1217,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
producer.close();

// Create producer
Producer<byte[]> producer1 = pulsarClient.newProducer()
Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1314,7 +1315,7 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1363,7 +1364,7 @@ public void statsOnNonExistingTopics() throws Exception {
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/ns1/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1755,7 +1756,7 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
.subscriptionName("my-sub").subscribe();

Producer<byte[]> producer = client.newProducer()
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
Expand Down Expand Up @@ -1949,7 +1950,7 @@ public void testTriggerCompaction() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";

// create a topic by creating a producer
pulsarClient.newProducer().topic(topicName).create().close();
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// mock actual compaction, we don't need to really run it
Expand Down Expand Up @@ -1983,7 +1984,7 @@ public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";

// create a topic by creating a producer
pulsarClient.newProducer().topic(topicName).create().close();
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

assertEquals(admin.topics().compactionStatus(topicName).status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -641,7 +642,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
assertEquals(admin.namespaces().getPersistence("prop-xyz/use/ns1"), new PersistencePolicies(3, 2, 1, 10.0));

// Force topic creation and namespace being loaded
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/my-topic").create();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns1/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns1/my-topic");

Expand Down Expand Up @@ -674,7 +675,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
}

// Force topic creation and namespace being loaded
producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");

Expand Down Expand Up @@ -806,7 +807,7 @@ public void partitionedTopics(String topicName) throws Exception {
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));

Producer<byte[]> producer = client.newProducer()
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
Expand Down Expand Up @@ -864,7 +865,7 @@ public void partitionedTopics(String topicName) throws Exception {
} catch (ConflictException ce) {
}

producer = client.newProducer()
producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -926,7 +927,7 @@ public void testNamespaceSplitBundle() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/use/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -956,7 +957,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/use/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1080,7 +1081,7 @@ public void testNamespaceUnloadBundle() throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1141,7 +1142,7 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1197,7 +1198,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
.subscribe();

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand All @@ -1210,7 +1211,7 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
producer.close();

// Create producer
Producer<byte[]> producer1 = pulsarClient.newProducer()
Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1307,7 +1308,7 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1356,7 +1357,7 @@ public void statsOnNonExistingTopics() throws Exception {
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/use/ns1/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer()
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand Down Expand Up @@ -1748,7 +1749,7 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1")
.subscriptionName("my-sub").subscribe();

Producer<byte[]> producer = client.newProducer()
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/use/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
Expand Down Expand Up @@ -1939,7 +1940,7 @@ public void testTriggerCompaction() throws Exception {
String topicName = "persistent://prop-xyz/use/ns1/topic1";

// create a topic by creating a producer
pulsarClient.newProducer().topic(topicName).create().close();
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// mock actual compaction, we don't need to really run it
Expand Down Expand Up @@ -1973,7 +1974,7 @@ public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/use/ns1/topic1";

// create a topic by creating a producer
pulsarClient.newProducer().topic(topicName).create().close();
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

assertEquals(admin.topics().compactionStatus(topicName).status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
Expand Down
Loading

0 comments on commit 0da4e4a

Please sign in to comment.