Skip to content

Commit

Permalink
[pulsar-broker] Fix: handle topic loading failure due to broken schem…
Browse files Browse the repository at this point in the history
…a ledger (apache#9212)

add more error log

fix list assignment
  • Loading branch information
rdhabalia authored Feb 7, 2021
1 parent 7916666 commit 3d5d6f6
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -47,6 +49,7 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
Expand Down Expand Up @@ -129,7 +132,7 @@ public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> {
getLocator(key).thenAccept(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}
Expand All @@ -154,9 +157,42 @@ public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String ke
return result;
}

private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
return getSchemaLocator(getSchemaPath(key));
}

public void clearLocatorCache(String key) {
localZkCache.invalidate(getSchemaPath(key));
}

@VisibleForTesting
List<Long> getSchemaLedgerList(String key) throws IOException {
Optional<LocatorEntry> locatorEntry = null;
try {
locatorEntry = getLocator(key).get();
} catch (Exception e) {
log.warn("Failed to get list of schema-storage ledger for {}", key,
(e instanceof ExecutionException ? e.getCause() : e));
throw new IOException("Failed to get schema ledger for" + key);
}
LocatorEntry entry = locatorEntry.orElse(null);
return entry != null ? entry.locator.getIndexList().stream().map(i -> i.getPosition().getLedgerId())
.collect(Collectors.toList()) : null;
}

@VisibleForTesting
BookKeeper getBookKeeper() {
return bookKeeper;
}

@Override
public CompletableFuture<SchemaVersion> delete(String key, boolean forcefully) {
return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
}

@Override
public CompletableFuture<SchemaVersion> delete(String key) {
return deleteSchema(key).thenApply(LongSchemaVersion::new);
return delete(key, false);
}

@NotNull
Expand Down Expand Up @@ -350,9 +386,10 @@ private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, by
}

@NotNull
private CompletableFuture<Long> deleteSchema(String schemaId) {
return getSchema(schemaId).thenCompose(schemaAndVersion -> {
if (isNull(schemaAndVersion)) {
private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully) {
return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId))
.thenCompose(schemaAndVersion -> {
if (!forceFully && isNull(schemaAndVersion)) {
return completedFuture(null);
} else {
// The version is only for the compatibility of the current interface
Expand All @@ -373,6 +410,7 @@ private CompletableFuture<Long> deleteSchema(String schemaId) {
} catch (InterruptedException | KeeperException e) {
future.completeExceptionally(e);
}
clearLocatorCache(getSchemaPath(schemaId));
future.complete(version);
}
}, null);
Expand Down Expand Up @@ -657,6 +695,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
if (entryId != -1) {
message += " - entry=" + entryId;
}
return new IOException(message);
boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException;
return new SchemaException(recoverable, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId) {
return completedFuture(null);
}

@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) {
return completedFuture(null);
}

@Override
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData s

CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);

CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully);

CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
Expand All @@ -40,9 +39,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand All @@ -54,6 +55,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks;
Expand Down Expand Up @@ -176,7 +178,12 @@ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String use

@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId) {
return schemaStorage.delete(schemaId);
return deleteSchemaStorage(schemaId, false);
}

@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) {
return schemaStorage.delete(schemaId, forcefully);
}

@Override
Expand Down Expand Up @@ -345,20 +352,58 @@ private CompletableFuture<Void> checkCompatibilityWithAll(SchemaData schema,
}

public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
// Trim the prefix of schemas before the latest delete.
int lastIndex = list.size() - 1;
for (int i = lastIndex; i >= 0; i--) {
if (list.get(i).schema.isDeleted()) {
if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
return Collections.emptyList();
} else {
return list.subList(i + 1, list.size());
}

CompletableFuture<List<SchemaAndMetadata>> schemaResult = new CompletableFuture<>();
CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> {
List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList;
if (ex != null) {
boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException)
? ((SchemaException) ex.getCause()).isRecoverable()
: true;
// if error is recoverable then fail the request.
if (recoverable) {
schemaResult.completeExceptionally(ex.getCause());
return null;
}
// clean the schema list for recoverable and delete the schema from zk
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
if (!schemaFuture.isCompletedExceptionally()) {
list.add(schemaFuture.getNow(null));
return;
}
});
trimDeletedSchemaAndGetList(list);
// clean up the broken schema from zk
deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", ex.getCause().getMessage(),
schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage()));
schemaResult.complete(list);
return null;
});
return null;
}
return list;
// trim the deleted schema and return the result if schema is retrieved successfully
List<SchemaAndMetadata> trimmed = trimDeletedSchemaAndGetList(list);
schemaResult.complete(trimmed);
return null;
});
return schemaResult;
}

private List<SchemaAndMetadata> trimDeletedSchemaAndGetList(List<SchemaAndMetadata> list) {
// Trim the prefix of schemas before the latest delete.
int lastIndex = list.size() - 1;
for (int i = lastIndex; i >= 0; i--) {
if (list.get(i).schema.isDeleted()) {
if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
return Collections.emptyList();
} else {
return list.subList(i + 1, list.size());
}
}
}
return list;
}

interface Functions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
public class SchemaException extends BrokerServiceException {

private static final long serialVersionUID = -6587520779026691815L;
private boolean recoverable;

public SchemaException(boolean recoverable, String message) {
super(message);
this.recoverable = recoverable;
}

public SchemaException(String message) {
super(message);
Expand All @@ -38,4 +44,8 @@ public SchemaException(Throwable cause) {
public SchemaException(String message, Throwable cause) {
super(message, cause);
}

public boolean isRecoverable() {
return recoverable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String use

@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId) {
return service.deleteSchemaStorage(schemaId);
return deleteSchemaStorage(schemaId, false);
}

@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) {
return service.deleteSchemaStorage(schemaId, forcefully);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.service.schema;

import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;

import java.util.ArrayList;
Expand All @@ -26,18 +29,24 @@

import lombok.Cleanup;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.schema.Schemas;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public class ClientGetSchemaTest extends ProducerConsumerBase {

private static final String topicBytes = "my-property/my-ns/topic-bytes";
Expand Down Expand Up @@ -104,4 +113,62 @@ public void testGetSchema(String serviceUrl) throws Exception {
assertEquals(client.getSchema(topicAvro).join(), Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
}

/**
* It validates if schema ledger is deleted or non recoverable then it will clean up schema storage for the topic
* and make the topic available.
*
* @throws Exception
*/
@Test
public void testSchemaFailure() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-broken-schema-storage";
final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString();

admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test"));

// (1) create topic with schema
Producer<Schemas.PersonTwo> producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtnOne).create();

producer.close();

String key = TopicName.get(fqtnOne).getSchemaName();
BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage();
long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);

// (2) break schema locator by deleting schema-ledger
schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);

admin.topics().unload(fqtnOne);

// (3) create topic again: broker should handle broken schema and load the topic successfully
producer = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtnOne).create();

assertNotEquals(schemaLedgerId, schemaStrogate.getSchemaLedgerList(key).get(0).longValue());

Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");

Consumer<Schemas.PersonTwo> consumer = pulsarClient
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test").topic(fqtnOne).subscribe();

producer.send(personTwo);

Schemas.PersonTwo personConsume = consumer.receive().getValue();
assertEquals("Tom", personConsume.getName());
assertEquals(1, personConsume.getId());

producer.close();
consumer.close();
}
}
Loading

0 comments on commit 3d5d6f6

Please sign in to comment.