Skip to content

Commit

Permalink
Add deleted schema judgment when adding schema (apache#4731)
Browse files Browse the repository at this point in the history
### Motivation
to fix apache#4724

### Verifying this change
Add the tests for it
  • Loading branch information
congbobo184 authored and sijie committed Jul 19, 2019
1 parent 81ab4ee commit 6ec2e16
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.NO_DELETED_VERSION;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -107,8 +108,8 @@ public void start() throws IOException {
}

@Override
public CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash) {
return putSchemaIfAbsent(key, value, hash).thenApply(LongSchemaVersion::new);
public CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash, long maxDeletedVersion) {
return putSchemaIfAbsent(key, value, hash, maxDeletedVersion).thenApply(LongSchemaVersion::new);
}

@Override
Expand Down Expand Up @@ -139,7 +140,7 @@ public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String ke
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion())
new LongSchemaVersion(indexEntry.getVersion())
)
)
));
Expand Down Expand Up @@ -279,7 +280,7 @@ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] h
}

@NotNull
private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data, byte[] hash) {
private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data, byte[] hash, long maxDeletedVersion) {
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {

if (optLocatorEntry.isPresent()) {
Expand All @@ -294,7 +295,7 @@ private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data,
log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
}

return findSchemaEntryByHash(locator.getIndexList(), hash).thenCompose(version -> {
return findSchemaEntryByHash(locator.getIndexList(), hash, maxDeletedVersion).thenCompose(version -> {
if (isNull(version)) {
return addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash));
Expand All @@ -312,7 +313,7 @@ private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] data,
// There was a race condition on the schema creation. Since it has now been created,
// retry the whole operation so that we have a chance to recover without bubbling error
// back to producer/consumer
putSchemaIfAbsent(schemaId, data, hash)
putSchemaIfAbsent(schemaId, data, hash, NO_DELETED_VERSION)
.thenAccept(version -> future.complete(version))
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
Expand Down Expand Up @@ -441,7 +442,8 @@ private CompletableFuture<SchemaStorageFormat.SchemaEntry> findSchemaEntryByVers
@NotNull
private CompletableFuture<Long> findSchemaEntryByHash(
List<SchemaStorageFormat.IndexEntry> index,
byte[] hash
byte[] hash,
long maxDeletedVersion
) {

if (index.isEmpty()) {
Expand All @@ -450,15 +452,19 @@ private CompletableFuture<Long> findSchemaEntryByHash(

for (SchemaStorageFormat.IndexEntry entry : index) {
if (Arrays.equals(entry.getHash().toByteArray(), hash)) {
return completedFuture(entry.getVersion());
if (entry.getVersion() > maxDeletedVersion) {
return completedFuture(entry.getVersion());
} else {
return completedFuture(null);
}
}
}

if (index.get(0).getPosition().getLedgerId() == -1) {
return completedFuture(null);
} else {
return readSchemaEntry(index.get(0).getPosition())
.thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), hash));
.thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), hash, maxDeletedVersion));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
import static org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;

Expand All @@ -43,6 +46,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -52,6 +56,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks;
private final SchemaStorage schemaStorage;
private final Clock clock;
protected static final long NO_DELETED_VERSION = -1L;

@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
Expand Down Expand Up @@ -106,19 +111,33 @@ public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchem
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getSchema(schemaId)
return getSchema(schemaId, SchemaVersion.Latest)
.thenCompose(
(existingSchema) ->
{
if (existingSchema == null || existingSchema.schema.isDeleted()) {
return completedFuture(true);
CompletableFuture<KeyValue<Long, Boolean>> keyValue;
if (existingSchema == null) {
keyValue = completedFuture(new KeyValue<>(NO_DELETED_VERSION, true));
} else if (existingSchema.schema.isDeleted()) {
keyValue = completedFuture(new KeyValue<>(((LongSchemaVersion)schemaStorage
.versionFromBytes(existingSchema.version.bytes())).getVersion(), true));
} else {
return isCompatible(schemaId, schema, strategy);
if (isTransitiveStrategy(strategy)) {
keyValue = checkCompatibilityWithAll(schemaId, schema, strategy);

} else {
keyValue = trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
completedFuture(((LongSchemaVersion)schemaStorage
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L))
.thenCompose(maxDeleteVersion -> isCompatible(schemaId, schema, strategy)
.thenCompose(isCompatible -> completedFuture(new KeyValue<>(maxDeleteVersion, isCompatible))));
}
}
return keyValue;
}
)
.thenCompose(isCompatible -> {
if (isCompatible) {
.thenCompose(keyValue -> {
if (keyValue.getValue()) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
Expand All @@ -129,7 +148,9 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray(), context);

return schemaStorage.put(schemaId, info.toByteArray(), context, keyValue.getKey());

} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException());
}
Expand All @@ -140,7 +161,19 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
@NotNull
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
byte[] deletedEntry = deleted(schemaId, user).toByteArray();
return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
schemaStorage.put(schemaId, deletedEntry, new byte[]{}, ((LongSchemaVersion)schemaStorage
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L));

}

private static boolean isTransitiveStrategy(SchemaCompatibilityStrategy strategy) {
if (FORWARD_TRANSITIVE.equals(strategy)
|| BACKWARD_TRANSITIVE.equals(strategy)
|| FULL_TRANSITIVE.equals(strategy)) {
return true;
}
return false;
}

@Override
Expand All @@ -150,7 +183,8 @@ public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schem
case FORWARD_TRANSITIVE:
case BACKWARD_TRANSITIVE:
case FULL_TRANSITIVE:
return checkCompatibilityWithAll(schemaId, schema, strategy);
return checkCompatibilityWithAll(schemaId, schema, strategy)
.thenCompose(keyValue -> completedFuture(keyValue.getValue()));
default:
return checkCompatibilityWithLatest(schemaId, schema, strategy);
}
Expand Down Expand Up @@ -195,30 +229,33 @@ private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId,
&& isCompatible(existingSchema, schema, strategy));
}

private CompletableFuture<Boolean> checkCompatibilityWithAll(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getAllSchemas(schemaId)
.thenCompose(FutureUtils::collect)
.thenApply(list -> {
// Trim the prefix of schemas before the latest delete.
int lastIndex = list.size() - 1;
for (int i = lastIndex; i >= 0; i--) {
if (list.get(i).schema.isDeleted()) {
if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
return Collections.<SchemaAndMetadata>emptyList();
} else {
return list.subList(i + 1, list.size());
}
}
private CompletableFuture<KeyValue<Long, Boolean>> checkCompatibilityWithAll(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
completedFuture(new KeyValue<>(((LongSchemaVersion)schemaStorage.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 1L,
compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(schemaAndMetadataList
.stream()
.map(schemaAndMetadata -> schemaAndMetadata.schema)
.collect(Collectors.toList()), schema, strategy)))
);
}

private CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
// Trim the prefix of schemas before the latest delete.
int lastIndex = list.size() - 1;
for (int i = lastIndex; i >= 0; i--) {
if (list.get(i).schema.isDeleted()) {
if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
return Collections.<SchemaAndMetadata>emptyList();
} else {
return list.subList(i + 1, list.size());
}
return list;
})
.thenApply(schemaAndMetadataList -> schemaAndMetadataList
.stream()
.map(schemaAndMetadata -> schemaAndMetadata.schema)
.collect(Collectors.toList()))
.thenApply(schemas -> compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(schemas, schema, strategy));
}
}
return list;
});
}

interface Functions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface SchemaStorage {

CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash, long maxDeletedVersion);

CompletableFuture<StoredSchema> get(String key, SchemaVersion version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,23 @@ public void writeReadBackDeleteSchemaEntry() throws Exception {
assertNull(schemaRegistryService.getSchema(schemaId1).get());
}

@Test
public void deleteSchemaAndAddSchema() throws Exception {
putSchema(schemaId1, schema1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
assertEquals(schema1, latest);

deleteSchema(schemaId1, version(1));

assertNull(schemaRegistryService.getSchema(schemaId1).get());

putSchema(schemaId1, schema1, version(2));

latest = getLatestSchema(schemaId1, version(2));
assertEquals(schema1, latest);

}

@Test
public void getReturnsTheLastWrittenEntry() throws Exception {
putSchema(schemaId1, schema1, version(0));
Expand Down

0 comments on commit 6ec2e16

Please sign in to comment.