Skip to content

Commit

Permalink
Fix deduplication cursor does not delete after disabling message dedu…
Browse files Browse the repository at this point in the history
…plication (apache#7656)

### Motivation

Fix deduplication cursor does not delete after disabling message deduplication. The issue occurs when enabling the message deduplication at the broker.conf and then disable it and restart the broker. The dedup cursor will not be deleted.
  • Loading branch information
codelipenghui authored Jul 30, 2020
1 parent 767c0d9 commit 2bba620
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public OffloadInProgressException(String msg) {
}
}

public static class CursorNotFoundException extends ManagedLedgerException {
public CursorNotFoundException(String msg) {
super(msg);
}
}

@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
final Object ctx) {
final ManagedCursorImpl cursor = (ManagedCursorImpl) cursors.get(consumerName);
if (cursor == null) {
callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
+ consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
cursors.removeCursor(consumerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class MessageDeduplication {
private ManagedCursor managedCursor;

enum Status {

// Deduplication is initialized
Initialized,

// Deduplication is disabled
Disabled,

Expand Down Expand Up @@ -122,7 +126,7 @@ public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, Managed
this.pulsar = pulsar;
this.topic = topic;
this.managedLedger = managedLedger;
this.status = Status.Disabled;
this.status = Status.Initialized;
this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
this.snapshotCounter = 0;
Expand Down Expand Up @@ -200,6 +204,25 @@ public CompletableFuture<Void> checkStatus() {
pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES);
return CompletableFuture.completedFuture(null);
}
if (status == Status.Initialized && !shouldBeEnabled) {
status = Status.Removing;
managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
status = Status.Disabled;
log.info("[{}] Deleted deduplication cursor", topic.getName());
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
} else {
log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception);
}
}
}, null);
}

if (status == Status.Enabled && !shouldBeEnabled) {
// Disabled deduping
Expand All @@ -220,15 +243,24 @@ public void deleteCursorComplete(Object ctx) {

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
exception.getMessage());
status = Status.Failed;
future.completeExceptionally(exception);
// It's ok for disable message deduplication.
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
managedCursor = null;
highestSequencedPushed.clear();
highestSequencedPersisted.clear();
future.complete(null);
} else {
log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
exception.getMessage());
status = Status.Failed;
future.completeExceptionally(exception);
}
}
}, null);

return future;
} else if (status == Status.Disabled && shouldBeEnabled) {
} else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
// Enable deduping
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -78,7 +77,6 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
Expand Down Expand Up @@ -143,7 +141,6 @@ public class ServerCnxTest {

private ManagedLedger ledgerMock = mock(ManagedLedger.class);
private ManagedCursor cursorMock = mock(ManagedCursor.class);

private OrderedExecutor executor;

@BeforeMethod
Expand Down

0 comments on commit 2bba620

Please sign in to comment.