Skip to content

Commit

Permalink
[fix][broker] PulsarLedgerManager to pass correct error code to BK cl…
Browse files Browse the repository at this point in the history
…ient (apache#16857)
  • Loading branch information
dlg99 authored Aug 3, 2022
1 parent d95f6cf commit 2e8bd3d
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,19 @@ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, M
DeleteLedgerCallback callback, Object ctx) {
Futures.waitForAll(info.ledgers.stream()
.filter(li -> !li.isOffloaded)
.map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
.map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
}
return result;
}))
.collect(Collectors.toList()))
.thenRun(() -> {
// Delete the metadata
Expand Down Expand Up @@ -904,7 +916,20 @@ private CompletableFuture<Void> deleteCursor(BookKeeper bkc, String managedLedge

// Delete the cursor ledger if present
if (cursor.cursorsLedgerId != -1) {
cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute();
cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId)
.execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", cursor.cursorsLedgerId);
return null;
}
throw new CompletionException(ex);
}
return result;
});
} else {
cursorLedgerDeleteFuture = CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
Expand All @@ -31,9 +32,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import lombok.Cleanup;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
Expand All @@ -47,12 +52,14 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = "broker")
@Slf4j
public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {

public BrokerBkEnsemblesTests() {
Expand Down Expand Up @@ -238,7 +245,7 @@ public void testSkipCorruptDataLedger() throws Exception {
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
e.printStackTrace();
log.warn("failed to delete ledger {}", entry.getKey(), e);
}
}
});
Expand Down Expand Up @@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
consumer.close();
}

@Test
public void testTruncateCorruptDataLedger() throws Exception {
// Ensure intended state for autoSkipNonRecoverableData
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();

final int totalMessages = 100;
final int totalDataLedgers = 5;
final int entriesPerLedger = totalMessages / totalDataLedgers;

final String tenant = "prop";
try {
admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
Sets.newHashSet(config.getClusterName())));
} catch (Exception e) {

}
final String ns1 = tenant + "/crash-broker";
try {
admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
} catch (Exception e) {

}

final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();

// Create subscription
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
configField.setAccessible(true);
// Create multiple data-ledger
ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
bookKeeperField.setAccessible(true);
// Create multiple data-ledger
BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);

// (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
Producer<byte[]> producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

// validate: consumer is able to consume msg and close consumer after reading 1 entry
Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
consumer.close();

NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
long firstLedgerToDelete = lastLedger.getKey();

// (2) delete first 4 data-ledgers
ledgerInfo.entrySet().forEach(entry -> {
if (!entry.equals(lastLedger)) {
assertEquals(entry.getValue().getEntries(), entriesPerLedger);
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
log.warn("failed to delete ledger {}", entry.getKey(), e);
}
}
});

// create 5 more ledgers
for (int i = 0; i < totalMessages; i++) {
String message = "my-message2-" + i;
producer.send(message.getBytes());
}

ml.delete();

// Admin should be able to truncate the topic
admin.topics().truncate(topic1);

ledgerInfo.entrySet().forEach(entry -> {
log.warn("found ledger: {}", entry.getKey());
assertNotEquals(firstLedgerToDelete, entry.getKey());
});

// Currently, ledger deletion is async and failed deletion
// does not actually fail truncation but logs an exception
// and creates scheduled task to retry
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
LedgerMetadata meta = bookKeeper
.getLedgerMetadata(firstLedgerToDelete)
.exceptionally(e -> null)
.get();
assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete);
});

// Should not throw, deleting absent ledger must be a noop
// unless PulsarManager returned a wrong error which
// got translated to BKUnexpectedConditionException
try {
bookKeeper.deleteLedger(firstLedgerToDelete);
} catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) {
// pass
}

producer.close();
consumer.close();
}

@Test
public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");

// bookkeeper client
Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
bookKeeperField.setAccessible(true);
// Create multiple data-ledger
BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);

ml.addEntry("dummy-entry-1".getBytes());

NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
long lastLedger = ledgerInfo.lastEntry().getKey();

ml.close();
bookKeeper.deleteLedger(lastLedger);

// BK ledger is deleted, factory should not throw on delete
factory.delete("test");
}

@Test(timeOut = 20000)
public void testTopicWithWildCardChar() throws Exception {
@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,6 +91,32 @@ public LedgerMetadata deserialize(String path, byte[] content, Stat stat) throws
store.registerListener(this::handleDataNotification);
}

private static Throwable mapToBkException(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
return mapToBkException(ex.getCause());
}

if (ex instanceof MetadataStoreException.NotFoundException) {
BKException bke = BKException.create(BKException.Code.NoSuchLedgerExistsOnMetadataServerException);
bke.initCause(ex);
return bke;
} else if (ex instanceof MetadataStoreException.AlreadyExistsException) {
BKException bke = BKException.create(BKException.Code.LedgerExistException);
bke.initCause(ex);
return bke;
} else if (ex instanceof MetadataStoreException.BadVersionException) {
BKException bke = BKException.create(BKException.Code.MetadataVersionException);
bke.initCause(ex);
return bke;
} else if (ex instanceof MetadataStoreException.AlreadyClosedException) {
BKException bke = BKException.create(BKException.Code.LedgerClosedException);
bke.initCause(ex);
return bke;
}

return ex;
}

@Override
public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
LedgerMetadata inputMetadata) {
Expand All @@ -106,14 +134,21 @@ public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long le
return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(ioe));
}

CompletableFuture<Versioned<LedgerMetadata>> future = store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
.thenApply(stat -> new Versioned(metadata, new LongVersion(stat.getVersion())));
future.exceptionally(ex -> {
log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
return null;
});
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();

return future;
store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
.whenComplete((stat, ex) -> {
if (ex != null) {
log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
promise.completeExceptionally(mapToBkException(ex));
return;
}

Versioned<LedgerMetadata> result = new Versioned(metadata, new LongVersion(stat.getVersion()));
promise.complete(result);
});

return promise;
}

@Override
Expand All @@ -131,9 +166,17 @@ public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version versi
}
}

return store.delete(getLedgerPath(ledgerId), existingVersion)
.thenRun(() -> {
// removed listener on ledgerId
CompletableFuture<Void> promise = new CompletableFuture<>();
store.delete(getLedgerPath(ledgerId), existingVersion)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to remove ledger metadata {}: {}", ledgerId, ex.getMessage());
promise.completeExceptionally(mapToBkException(ex));
return;
}

promise.complete(result);
// remove listener on ledgerId
Set<BookkeeperInternalCallbacks.LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
if (null != listenerSet) {
if (log.isDebugEnabled()) {
Expand All @@ -148,6 +191,8 @@ public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version versi
}
}
});

return promise;
}

@Override
Expand Down

0 comments on commit 2e8bd3d

Please sign in to comment.