From 6ec2e16b6bd68ad51e2841277164a53fcfb1260b Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Sat, 20 Jul 2019 00:52:07 +0800 Subject: [PATCH] Add deleted schema judgment when adding schema (#4731) ### Motivation to fix #4724 ### Verifying this change Add the tests for it --- .../schema/BookkeeperSchemaStorage.java | 24 +++-- .../schema/SchemaRegistryServiceImpl.java | 101 ++++++++++++------ .../broker/service/schema/SchemaStorage.java | 2 +- .../service/schema/SchemaServiceTest.java | 17 +++ 4 files changed, 102 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index c5d67bd5381c1..78d3e8a238088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -25,6 +25,7 @@ import static java.util.Objects.nonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.NO_DELETED_VERSION; import com.google.common.annotations.VisibleForTesting; @@ -107,8 +108,8 @@ public void start() throws IOException { } @Override - public CompletableFuture put(String key, byte[] value, byte[] hash) { - return putSchemaIfAbsent(key, value, hash).thenApply(LongSchemaVersion::new); + public CompletableFuture put(String key, byte[] value, byte[] hash, long maxDeletedVersion) { + return putSchemaIfAbsent(key, value, hash, maxDeletedVersion).thenApply(LongSchemaVersion::new); } @Override @@ -139,7 +140,7 @@ public CompletableFuture>> getAll(String ke .thenApply(entry -> new StoredSchema ( entry.getSchemaData().toByteArray(), - new LongSchemaVersion(schemaLocator.getInfo().getVersion()) + new LongSchemaVersion(indexEntry.getVersion()) ) ) )); @@ -279,7 +280,7 @@ private CompletableFuture putSchema(String schemaId, byte[] data, byte[] h } @NotNull - private CompletableFuture putSchemaIfAbsent(String schemaId, byte[] data, byte[] hash) { + private CompletableFuture putSchemaIfAbsent(String schemaId, byte[] data, byte[] hash, long maxDeletedVersion) { return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> { if (optLocatorEntry.isPresent()) { @@ -294,7 +295,7 @@ private CompletableFuture putSchemaIfAbsent(String schemaId, byte[] data, log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash); } - return findSchemaEntryByHash(locator.getIndexList(), hash).thenCompose(version -> { + return findSchemaEntryByHash(locator.getIndexList(), hash, maxDeletedVersion).thenCompose(version -> { if (isNull(version)) { return addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose( position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)); @@ -312,7 +313,7 @@ private CompletableFuture putSchemaIfAbsent(String schemaId, byte[] data, // There was a race condition on the schema creation. Since it has now been created, // retry the whole operation so that we have a chance to recover without bubbling error // back to producer/consumer - putSchemaIfAbsent(schemaId, data, hash) + putSchemaIfAbsent(schemaId, data, hash, NO_DELETED_VERSION) .thenAccept(version -> future.complete(version)) .exceptionally(ex2 -> { future.completeExceptionally(ex2); @@ -441,7 +442,8 @@ private CompletableFuture findSchemaEntryByVers @NotNull private CompletableFuture findSchemaEntryByHash( List index, - byte[] hash + byte[] hash, + long maxDeletedVersion ) { if (index.isEmpty()) { @@ -450,7 +452,11 @@ private CompletableFuture findSchemaEntryByHash( for (SchemaStorageFormat.IndexEntry entry : index) { if (Arrays.equals(entry.getHash().toByteArray(), hash)) { - return completedFuture(entry.getVersion()); + if (entry.getVersion() > maxDeletedVersion) { + return completedFuture(entry.getVersion()); + } else { + return completedFuture(null); + } } } @@ -458,7 +464,7 @@ private CompletableFuture findSchemaEntryByHash( return completedFuture(null); } else { return readSchemaEntry(index.get(0).getPosition()) - .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), hash)); + .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), hash, maxDeletedVersion)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 401436527e04c..e2975a18c8388 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -20,6 +20,9 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE; +import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE; +import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FULL_TRANSITIVE; import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap; import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; @@ -43,6 +46,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; @@ -52,6 +56,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { private final Map compatibilityChecks; private final SchemaStorage schemaStorage; private final Clock clock; + protected static final long NO_DELETED_VERSION = -1L; @VisibleForTesting SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map compatibilityChecks, Clock clock) { @@ -106,19 +111,33 @@ public CompletableFuture>> getAllSchem @NotNull public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { - return getSchema(schemaId) + return getSchema(schemaId, SchemaVersion.Latest) .thenCompose( (existingSchema) -> { - if (existingSchema == null || existingSchema.schema.isDeleted()) { - return completedFuture(true); + CompletableFuture> keyValue; + if (existingSchema == null) { + keyValue = completedFuture(new KeyValue<>(NO_DELETED_VERSION, true)); + } else if (existingSchema.schema.isDeleted()) { + keyValue = completedFuture(new KeyValue<>(((LongSchemaVersion)schemaStorage + .versionFromBytes(existingSchema.version.bytes())).getVersion(), true)); } else { - return isCompatible(schemaId, schema, strategy); + if (isTransitiveStrategy(strategy)) { + keyValue = checkCompatibilityWithAll(schemaId, schema, strategy); + + } else { + keyValue = trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> + completedFuture(((LongSchemaVersion)schemaStorage + .versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L)) + .thenCompose(maxDeleteVersion -> isCompatible(schemaId, schema, strategy) + .thenCompose(isCompatible -> completedFuture(new KeyValue<>(maxDeleteVersion, isCompatible)))); + } } + return keyValue; } ) - .thenCompose(isCompatible -> { - if (isCompatible) { + .thenCompose(keyValue -> { + if (keyValue.getValue()) { byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() .setType(Functions.convertFromDomainType(schema.getType())) @@ -129,7 +148,9 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem .setTimestamp(clock.millis()) .addAllProps(toPairs(schema.getProps())) .build(); - return schemaStorage.put(schemaId, info.toByteArray(), context); + + return schemaStorage.put(schemaId, info.toByteArray(), context, keyValue.getKey()); + } else { return FutureUtil.failedFuture(new IncompatibleSchemaException()); } @@ -140,7 +161,19 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem @NotNull public CompletableFuture deleteSchema(String schemaId, String user) { byte[] deletedEntry = deleted(schemaId, user).toByteArray(); - return schemaStorage.put(schemaId, deletedEntry, new byte[]{}); + return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> + schemaStorage.put(schemaId, deletedEntry, new byte[]{}, ((LongSchemaVersion)schemaStorage + .versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L)); + + } + + private static boolean isTransitiveStrategy(SchemaCompatibilityStrategy strategy) { + if (FORWARD_TRANSITIVE.equals(strategy) + || BACKWARD_TRANSITIVE.equals(strategy) + || FULL_TRANSITIVE.equals(strategy)) { + return true; + } + return false; } @Override @@ -150,7 +183,8 @@ public CompletableFuture isCompatible(String schemaId, SchemaData schem case FORWARD_TRANSITIVE: case BACKWARD_TRANSITIVE: case FULL_TRANSITIVE: - return checkCompatibilityWithAll(schemaId, schema, strategy); + return checkCompatibilityWithAll(schemaId, schema, strategy) + .thenCompose(keyValue -> completedFuture(keyValue.getValue())); default: return checkCompatibilityWithLatest(schemaId, schema, strategy); } @@ -195,30 +229,33 @@ private CompletableFuture checkCompatibilityWithLatest(String schemaId, && isCompatible(existingSchema, schema, strategy)); } - private CompletableFuture checkCompatibilityWithAll(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { - return getAllSchemas(schemaId) - .thenCompose(FutureUtils::collect) - .thenApply(list -> { - // Trim the prefix of schemas before the latest delete. - int lastIndex = list.size() - 1; - for (int i = lastIndex; i >= 0; i--) { - if (list.get(i).schema.isDeleted()) { - if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare - return Collections.emptyList(); - } else { - return list.subList(i + 1, list.size()); - } - } + private CompletableFuture> checkCompatibilityWithAll(String schemaId, SchemaData schema, + SchemaCompatibilityStrategy strategy) { + return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> + completedFuture(new KeyValue<>(((LongSchemaVersion)schemaStorage.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L, + compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) + .isCompatible(schemaAndMetadataList + .stream() + .map(schemaAndMetadata -> schemaAndMetadata.schema) + .collect(Collectors.toList()), schema, strategy))) + ); + } + + private CompletableFuture> trimDeletedSchemaAndGetList(String schemaId) { + return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> { + // Trim the prefix of schemas before the latest delete. + int lastIndex = list.size() - 1; + for (int i = lastIndex; i >= 0; i--) { + if (list.get(i).schema.isDeleted()) { + if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare + return Collections.emptyList(); + } else { + return list.subList(i + 1, list.size()); } - return list; - }) - .thenApply(schemaAndMetadataList -> schemaAndMetadataList - .stream() - .map(schemaAndMetadata -> schemaAndMetadata.schema) - .collect(Collectors.toList())) - .thenApply(schemas -> compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) - .isCompatible(schemas, schema, strategy)); + } + } + return list; + }); } interface Functions { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java index f1336667bd5a1..e59fa33231afa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java @@ -24,7 +24,7 @@ public interface SchemaStorage { - CompletableFuture put(String key, byte[] value, byte[] hash); + CompletableFuture put(String key, byte[] value, byte[] hash, long maxDeletedVersion); CompletableFuture get(String key, SchemaVersion version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index ffe75325c2c2d..d998fdd7d4c19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -110,6 +110,23 @@ public void writeReadBackDeleteSchemaEntry() throws Exception { assertNull(schemaRegistryService.getSchema(schemaId1).get()); } + @Test + public void deleteSchemaAndAddSchema() throws Exception { + putSchema(schemaId1, schema1, version(0)); + SchemaData latest = getLatestSchema(schemaId1, version(0)); + assertEquals(schema1, latest); + + deleteSchema(schemaId1, version(1)); + + assertNull(schemaRegistryService.getSchema(schemaId1).get()); + + putSchema(schemaId1, schema1, version(2)); + + latest = getLatestSchema(schemaId1, version(2)); + assertEquals(schema1, latest); + + } + @Test public void getReturnsTheLastWrittenEntry() throws Exception { putSchema(schemaId1, schema1, version(0));