Skip to content

Commit

Permalink
[Issue 12757][broker] add broker config isAllowAutoUpdateSchema (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Nov 18, 2021
1 parent 5c12be7 commit fa7be23
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 17 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int brokerMaxConnectionsPerIp = 0;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+ " of namespace policy. This is enabled by default."
)
private boolean isAllowAutoUpdateSchemaEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

Expand Down Expand Up @@ -333,20 +333,28 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return isAllowAutoUpdateSchema ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
: schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil
.failedFuture(
new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));

if (allowAutoUpdateSchema()) {
return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
} else {
return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));
}
}

private boolean allowAutoUpdateSchema() {
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
return isAllowAutoUpdateSchema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg
}
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
throws Exception {

final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);

admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());


pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn);
try {
producerThreeBuilder.create();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtn);

Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
Message<Schemas.PersonTwo> message = consumerTwo.receive();

Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

producer.close();
consumerTwo.close();

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);

producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
message = consumerTwo.receive();

personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

consumerTwo.close();
producer.close();
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class Policies {
public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;

@SuppressWarnings("checkstyle:MemberName")
public boolean is_allow_auto_update_schema = true;
public Boolean is_allow_auto_update_schema = null;

@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;
Expand Down
3 changes: 2 additions & 1 deletion site2/website-next/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
| managedLedgerInfoCompressionType | Compression type of managed ledger information. <br /><br />Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`). <br /><br />If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed. <br /><br />**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None |
| additionalServlets | Additional servlet name. <br /><br />If you have multiple additional servlets, separate them by commas. <br /><br />For example, additionalServlet_1, additionalServlet_2 | N/A |
| additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet |
| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true|

## Client

Expand Down Expand Up @@ -480,7 +481,6 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
| dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 |
| dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0|
| dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false|

| dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false |
|dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0|
|dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0|
Expand Down Expand Up @@ -650,6 +650,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false|
|bookieId | If you want to custom a bookie ID or use a dynamic network address for a bookie, you can set the `bookieId`. <br /><br />Bookie advertises itself using the `bookieId` rather than the `BookieSocketAddress` (`hostname:port` or `IP:port`).<br /><br /> The `bookieId` is a non-empty string that can contain ASCII digits and letters ([a-zA-Z9-0]), colons, dashes, and dots. <br /><br />For more information about `bookieId`, see [here](http://bookkeeper.apache.org/bps/BP-41-bookieid/).|/|
| maxTopicsPerNamespace | The maximum number of persistent topics that can be created in the namespace. When the number of topics reaches this threshold, the broker rejects the request of creating a new topic, including the auto-created topics by the producer or consumer, until the number of connected consumers decreases. The default value 0 disables the check. | 0 |
| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true|

## WebSocket

Expand Down

0 comments on commit fa7be23

Please sign in to comment.