Skip to content

Commit

Permalink
Clarify and add tests for schema change scenarios (apache#2669)
Browse files Browse the repository at this point in the history
A producer with a schema can attach to a topic if
- the topic does not exist
- the topic exists but currently has no schema
- the topic exists and the schema is compatible with the producer
  schema

A consumer with a schema can attach to a topic if
- the topic does not exist (the schema from the consumer is added to
  the topic)
- the topic exists and the schema is compatible with the consumer
  schema

A producer without a schema can attach if
- the topic does not exist
- the topic exists and has no schema

A consumer without a schema can attach if
- the topic exists, either with or without a schema
  • Loading branch information
ivankelly authored and sijie committed Sep 30, 2018
1 parent f5d8b9a commit ff6de4a
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic, null);
CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
if (future != null) {
persistentTopics.add(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ private Topic getTopicReference(TopicName topicName) {
}

private Topic getOrCreateTopic(TopicName topicName) {
return pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), null).join();
return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -451,19 +450,14 @@ public void unloadNamespaceBundlesGracefully() {
}

public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
return getTopic(topic, false /* createIfMissing */, null /* schemaData */ );
return getTopic(topic, false /* createIfMissing */);
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getOrCreateTopic(topic, null);
return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get);
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic, SchemaData schemaData) {
return getTopic(topic, true /* createIfMissing */, schemaData ).thenApply(Optional::get);
}

private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing,
SchemaData schemaData) {
private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
Expand All @@ -477,8 +471,8 @@ private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean
}
final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
return topics.computeIfAbsent(topic, (topicName) -> {
return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing, schemaData)
: createNonPersistentTopic(topicName, schemaData);
return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
: createNonPersistentTopic(topicName);
});
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
Expand All @@ -495,7 +489,7 @@ private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean
}
}

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic, SchemaData schemaData) {
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();

if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
Expand Down Expand Up @@ -526,15 +520,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
return null;
});

return topicFuture.thenCompose(ot -> {
if (ot.isPresent()) {
// If a schema is provided, add or validate it before the
// topic is "visible"
return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
} else {
return CompletableFuture.completedFuture(ot);
}
});
return topicFuture;
}

private static <T> CompletableFuture<T> failedFuture(Throwable t) {
Expand Down Expand Up @@ -592,7 +578,7 @@ public PulsarClient getReplicationClient(String cluster) {
* @throws RuntimeException
*/
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
boolean createIfMissing, SchemaData schemaData) throws RuntimeException {
boolean createIfMissing) throws RuntimeException {
checkTopicNsOwnership(topic);

final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -620,15 +606,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
log.debug("topic-loading for {} added into pending queue", topic);
}
}
return topicFuture.thenCompose(ot -> {
if (ot.isPresent()) {
// If a schema is provided, add or validate it before the
// topic is "visible"
return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
} else {
return CompletableFuture.completedFuture(ot);
}
});
return topicFuture;
}

private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -585,20 +586,23 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}
}

service.getOrCreateTopic(topicName.toString(), schema)
service.getOrCreateTopic(topicName.toString())
.thenCompose(topic -> {
if (schema != null) {
return topic.isSchemaCompatible(schema).thenCompose(isCompatible -> {
if (isCompatible) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition);
} else {
return FutureUtil.failedFuture(new BrokerServiceException(
"Trying to subscribe with incompatible schema"
));
}
});
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(isCompatible -> {
if (isCompatible) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition);
} else {
return FutureUtil.failedFuture(
new BrokerServiceException(
"Trying to subscribe with incompatible schema"
));
}
});
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
Expand Down Expand Up @@ -792,7 +796,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);

service.getOrCreateTopic(topicName.toString(), schema).thenAccept((Topic topic) -> {
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
Expand Down Expand Up @@ -827,7 +831,16 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (schema != null) {
schemaVersionFuture = topic.addSchema(schema);
} else {
schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty);
schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
if (hasSchema) {
result.completeExceptionally(new IncompatibleSchemaException(
"Producers cannot connect without a schema to topics with a schema"));
} else {
result.complete(SchemaVersion.Empty);
}
return result;
});
}

schemaVersionFuture.exceptionally(exception -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,28 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

Position getLastMessageId();

/**
* Whether a topic has had a schema defined for it.
*/
CompletableFuture<Boolean> hasSchema();

/**
* Add a schema to the topic. This will fail if the new schema is incompatible with the current
* schema.
*/
CompletableFuture<SchemaVersion> addSchema(SchemaData schema);

/**
* Check if schema is compatible with current topic schema.
*/
CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);

/**
* If the topic is idle (no producers, no entries, no subscribers and no existing schema),
* add the passed schema to the topic. Otherwise, check that the passed schema is compatible
* with what the topic already has.
*/
CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema);

CompletableFuture<Void> deleteForcefully();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ public void markBatchMessagePublished() {

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

@Override
public CompletableFuture<Boolean> hasSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.getSchema(id).thenApply((schema) -> schema != null);
}

@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
if (schema == null) {
Expand All @@ -1023,4 +1032,16 @@ public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
.getSchemaRegistryService()
.isCompatibleWithLatestVersion(id, schema);
}

@Override
public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return isSchemaCompatible(schema);
} else {
return addSchema(schema).thenApply((ignore) -> true);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,15 @@ public synchronized OffloadProcessStatus offloadStatus() {

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

@Override
public CompletableFuture<Boolean> hasSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.getSchema(id).thenApply((schema) -> schema != null);
}

@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
if (schema == null) {
Expand All @@ -1811,4 +1820,16 @@ public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
.getSchemaRegistryService()
.isCompatibleWithLatestVersion(id, schema);
}

@Override
public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
return isSchemaCompatible(schema);
} else {
return addSchema(schema).thenApply((ignore) -> true);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,25 @@ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVer
@Override
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
return checkCompatibilityWithLatest(schemaId, schema).thenCompose(isCompatible -> {
if (isCompatible) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
.setSchemaId(schemaId)
.setUser(schema.getUser())
.setDeleted(false)
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray(), context);
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException());
}
});
return getSchema(schemaId).thenApply(
(existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema))
.thenCompose(isCompatible -> {
if (isCompatible) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
.setSchemaId(schemaId)
.setUser(schema.getUser())
.setDeleted(false)
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray(), context);
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException());
}
});
}

@Override
Expand Down Expand Up @@ -135,15 +137,14 @@ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) {
.build();
}

private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema) {
return compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(existingSchema.schema, newSchema);
}

return getSchema(schemaId).thenApply(storedSchema ->
(storedSchema == null) ||
compatibilityChecks.getOrDefault(
schema.getType(),
SchemaCompatibilityCheck.DEFAULT
).isCompatible(storedSchema.schema, schema)
);
private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
return getSchema(schemaId).thenApply(
(existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema));
}

interface Functions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
consumer.acknowledge(msg);
}

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedCursorImpl cursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);

Expand Down Expand Up @@ -206,7 +206,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
}

// (5) Broker should create new cursor-ledger and remove old cursor-ledger
topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
final ManagedCursorImpl cursor1 = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 100);
long newCursorLedgerId = cursor1.getCursorLedger();
Expand Down Expand Up @@ -261,7 +261,7 @@ public void testSkipCorruptDataLedger() throws Exception {
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
Expand Down
Loading

0 comments on commit ff6de4a

Please sign in to comment.