From 3d5d6f6a1681cf842a2e3334d412fa449c553fd0 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Sat, 6 Feb 2021 20:58:32 -0800 Subject: [PATCH] [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212) add more error log fix list assignment --- .../schema/BookkeeperSchemaStorage.java | 52 ++++++++++++-- .../schema/DefaultSchemaRegistryService.java | 5 ++ .../broker/service/schema/SchemaRegistry.java | 2 + .../schema/SchemaRegistryServiceImpl.java | 71 +++++++++++++++---- .../schema/exceptions/SchemaException.java | 10 +++ ...egistryServiceWithSchemaDataValidator.java | 7 +- .../service/schema/ClientGetSchemaTest.java | 67 +++++++++++++++++ .../org/apache/pulsar/schema/SchemaTest.java | 16 +++-- .../common/protocol/schema/SchemaStorage.java | 2 + 9 files changed, 206 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 475e6f9f2d450..3c29da8e729ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -37,7 +37,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -47,6 +49,7 @@ import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.protocol.schema.StoredSchema; @@ -129,7 +132,7 @@ public CompletableFuture get(String key, SchemaVersion version) { @Override public CompletableFuture>> getAll(String key) { CompletableFuture>> result = new CompletableFuture<>(); - getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> { + getLocator(key).thenAccept(locator -> { if (log.isDebugEnabled()) { log.debug("[{}] Get all schemas - locator: {}", key, locator); } @@ -154,9 +157,42 @@ public CompletableFuture>> getAll(String ke return result; } + private CompletableFuture> getLocator(String key) { + return getSchemaLocator(getSchemaPath(key)); + } + + public void clearLocatorCache(String key) { + localZkCache.invalidate(getSchemaPath(key)); + } + + @VisibleForTesting + List getSchemaLedgerList(String key) throws IOException { + Optional locatorEntry = null; + try { + locatorEntry = getLocator(key).get(); + } catch (Exception e) { + log.warn("Failed to get list of schema-storage ledger for {}", key, + (e instanceof ExecutionException ? e.getCause() : e)); + throw new IOException("Failed to get schema ledger for" + key); + } + LocatorEntry entry = locatorEntry.orElse(null); + return entry != null ? entry.locator.getIndexList().stream().map(i -> i.getPosition().getLedgerId()) + .collect(Collectors.toList()) : null; + } + + @VisibleForTesting + BookKeeper getBookKeeper() { + return bookKeeper; + } + + @Override + public CompletableFuture delete(String key, boolean forcefully) { + return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new); + } + @Override public CompletableFuture delete(String key) { - return deleteSchema(key).thenApply(LongSchemaVersion::new); + return delete(key, false); } @NotNull @@ -350,9 +386,10 @@ private CompletableFuture createNewSchema(String schemaId, byte[] data, by } @NotNull - private CompletableFuture deleteSchema(String schemaId) { - return getSchema(schemaId).thenCompose(schemaAndVersion -> { - if (isNull(schemaAndVersion)) { + private CompletableFuture deleteSchema(String schemaId, boolean forceFully) { + return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId)) + .thenCompose(schemaAndVersion -> { + if (!forceFully && isNull(schemaAndVersion)) { return completedFuture(null); } else { // The version is only for the compatibility of the current interface @@ -373,6 +410,7 @@ private CompletableFuture deleteSchema(String schemaId) { } catch (InterruptedException | KeeperException e) { future.completeExceptionally(e); } + clearLocatorCache(getSchemaPath(schemaId)); future.complete(version); } }, null); @@ -657,6 +695,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon if (entryId != -1) { message += " - entry=" + entryId; } - return new IOException(message); + boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException + && rc != BKException.Code.NoSuchEntryException; + return new SchemaException(recoverable, message); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 56ccb68cd1b9e..8f9894793daf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -80,6 +80,11 @@ public CompletableFuture deleteSchemaStorage(String schemaId) { return completedFuture(null); } + @Override + public CompletableFuture deleteSchemaStorage(String schemaId, boolean forcefully) { + return completedFuture(null); + } + @Override public CompletableFuture isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index b75686937fa2e..f99e8f6e7223f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -41,6 +41,8 @@ CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData s CompletableFuture deleteSchemaStorage(String schemaId); + CompletableFuture deleteSchemaStorage(String schemaId, boolean forcefully); + CompletableFuture isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index d16f36243b9f4..364628f2af022 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -20,7 +20,6 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap; import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE; import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE; @@ -40,9 +39,11 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.validation.constraints.NotNull; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -54,6 +55,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +@Slf4j public class SchemaRegistryServiceImpl implements SchemaRegistryService { private static HashFunction hashFunction = Hashing.sha256(); private final Map compatibilityChecks; @@ -176,7 +178,12 @@ public CompletableFuture deleteSchema(String schemaId, String use @Override public CompletableFuture deleteSchemaStorage(String schemaId) { - return schemaStorage.delete(schemaId); + return deleteSchemaStorage(schemaId, false); + } + + @Override + public CompletableFuture deleteSchemaStorage(String schemaId, boolean forcefully) { + return schemaStorage.delete(schemaId, forcefully); } @Override @@ -345,20 +352,58 @@ private CompletableFuture checkCompatibilityWithAll(SchemaData schema, } public CompletableFuture> trimDeletedSchemaAndGetList(String schemaId) { - return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> { - // Trim the prefix of schemas before the latest delete. - int lastIndex = list.size() - 1; - for (int i = lastIndex; i >= 0; i--) { - if (list.get(i).schema.isDeleted()) { - if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare - return Collections.emptyList(); - } else { - return list.subList(i + 1, list.size()); - } + + CompletableFuture> schemaResult = new CompletableFuture<>(); + CompletableFuture>> schemaFutureList = getAllSchemas(schemaId); + schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> { + List list = ex != null ? new ArrayList<>() : schemaList; + if (ex != null) { + boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException) + ? ((SchemaException) ex.getCause()).isRecoverable() + : true; + // if error is recoverable then fail the request. + if (recoverable) { + schemaResult.completeExceptionally(ex.getCause()); + return null; } + // clean the schema list for recoverable and delete the schema from zk + schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> { + if (!schemaFuture.isCompletedExceptionally()) { + list.add(schemaFuture.getNow(null)); + return; + } + }); + trimDeletedSchemaAndGetList(list); + // clean up the broken schema from zk + deleteSchemaStorage(schemaId, true).handle((sv, th) -> { + log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", ex.getCause().getMessage(), + schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage())); + schemaResult.complete(list); + return null; + }); + return null; } - return list; + // trim the deleted schema and return the result if schema is retrieved successfully + List trimmed = trimDeletedSchemaAndGetList(list); + schemaResult.complete(trimmed); + return null; }); + return schemaResult; + } + + private List trimDeletedSchemaAndGetList(List list) { + // Trim the prefix of schemas before the latest delete. + int lastIndex = list.size() - 1; + for (int i = lastIndex; i >= 0; i--) { + if (list.get(i).schema.isDeleted()) { + if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare + return Collections.emptyList(); + } else { + return list.subList(i + 1, list.size()); + } + } + } + return list; } interface Functions { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java index 8d85b398bc0ff..e9267d6128281 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java @@ -26,6 +26,12 @@ public class SchemaException extends BrokerServiceException { private static final long serialVersionUID = -6587520779026691815L; + private boolean recoverable; + + public SchemaException(boolean recoverable, String message) { + super(message); + this.recoverable = recoverable; + } public SchemaException(String message) { super(message); @@ -38,4 +44,8 @@ public SchemaException(Throwable cause) { public SchemaException(String message, Throwable cause) { super(message, cause); } + + public boolean isRecoverable() { + return recoverable; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java index 99a33e01abb2e..25704ccccfcd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -103,7 +103,12 @@ public CompletableFuture deleteSchema(String schemaId, String use @Override public CompletableFuture deleteSchemaStorage(String schemaId) { - return service.deleteSchemaStorage(schemaId); + return deleteSchemaStorage(schemaId, false); + } + + @Override + public CompletableFuture deleteSchemaStorage(String schemaId, boolean forcefully) { + return service.deleteSchemaStorage(schemaId, forcefully); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java index dc35286141ad5..8ce8552a107cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service.schema; +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertEquals; import java.util.ArrayList; @@ -26,6 +29,7 @@ import lombok.Cleanup; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -33,11 +37,16 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.schema.Schemas; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.Sets; + public class ClientGetSchemaTest extends ProducerConsumerBase { private static final String topicBytes = "my-property/my-ns/topic-bytes"; @@ -104,4 +113,62 @@ public void testGetSchema(String serviceUrl) throws Exception { assertEquals(client.getSchema(topicAvro).join(), Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo())); } + /** + * It validates if schema ledger is deleted or non recoverable then it will clean up schema storage for the topic + * and make the topic available. + * + * @throws Exception + */ + @Test + public void testSchemaFailure() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-broken-schema-storage"; + final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); + + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test")); + + // (1) create topic with schema + Producer producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne).create(); + + producer.close(); + + String key = TopicName.get(fqtnOne).getSchemaName(); + BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage(); + long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0); + + // (2) break schema locator by deleting schema-ledger + schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId); + + admin.topics().unload(fqtnOne); + + // (3) create topic again: broker should handle broken schema and load the topic successfully + producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne).create(); + + assertNotEquals(schemaLedgerId, schemaStrogate.getSchemaLedgerList(key).get(0).longValue()); + + Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); + personTwo.setId(1); + personTwo.setName("Tom"); + + Consumer consumer = pulsarClient + .newConsumer(Schema.AVRO(SchemaDefinition. builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test").topic(fqtnOne).subscribe(); + + producer.send(personTwo); + + Schemas.PersonTwo personConsume = consumer.receive().getValue(); + assertEquals("Tom", personConsume.getName()); + assertEquals(1, personConsume.getId()); + + producer.close(); + consumer.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 2da840156ce6e..b8580fd48b29c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -18,9 +18,15 @@ */ package org.apache.pulsar.schema; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.util.Collections; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -35,11 +41,9 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.Collections; +import com.google.common.collect.Sets; -import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; -import static org.junit.Assert.assertEquals; +import lombok.extern.slf4j.Slf4j; @Slf4j public class SchemaTest extends MockedPulsarServiceBaseTest { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index 9f007aa680991..c34154bc82733 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -32,6 +32,8 @@ public interface SchemaStorage { CompletableFuture>> getAll(String key); + CompletableFuture delete(String key, boolean forcefully); + CompletableFuture delete(String key); SchemaVersion versionFromBytes(byte[] version);