Skip to content

Commit

Permalink
Creating a topic does not wait for creating cursor of replicators (ap…
Browse files Browse the repository at this point in the history
…ache#6364)

### Motivation

Creating a topic does not wait for creating cursor of replicators

## Verifying this change

The exists unit test can cover this change
  • Loading branch information
codelipenghui authored Feb 23, 2020
1 parent af4773b commit 336e971
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,30 +122,38 @@ public synchronized void startProducer() {
log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName,
localCluster, remoteCluster, state);
}

return;
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
openCursorAsync().thenAccept(v ->
producerBuilder.createAsync()
.thenAccept(this::readEntries)
.exceptionally(ex -> {
retryCreateProducer(ex);
return null;
})).exceptionally(ex -> {
retryCreateProducer(ex);
return null;
});
}

private void retryCreateProducer(Throwable ex) {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
}

protected abstract CompletableFuture<Void> openCursorAsync();

protected synchronized CompletableFuture<Void> closeProducerAsync() {
if (producer == null) {
STATE_UPDATER.set(this, State.Stopped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ protected void disableReplicatorRead() {
// No-op
}

@Override
protected CompletableFuture<Void> openCursorAsync() {
return CompletableFuture.completedFuture(null);
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
Expand All @@ -33,11 +34,13 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
Expand All @@ -46,6 +49,7 @@
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
Expand All @@ -55,6 +59,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
Expand All @@ -65,7 +70,9 @@
public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {

private final PersistentTopic topic;
private final ManagedCursor cursor;
private final String replicatorName;
private final ManagedLedger ledger;
protected ManagedCursor cursor;

private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

Expand Down Expand Up @@ -97,11 +104,14 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat

private final ReplicatorStats stats = new ReplicatorStats();

// Only for test
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.replicatorName = cursor.getName();
this.ledger = cursor.getManagedLedger();
this.cursor = cursor;
this.topic = topic;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
Expand All @@ -116,6 +126,25 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
startProducer();
}

public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster,
BrokerService brokerService, ManagedLedger ledger) throws NamingException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.replicatorName = replicatorName;
this.ledger = ledger;
this.topic = topic;
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);

readBatchSize = Math.min(
producerQueueSize,
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
producerQueueThreshold = (int) (producerQueueSize * 0.9);

this.initializeDispatchRateLimiterIfNeeded(Optional.empty());

startProducer();
}

@Override
protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
Expand Down Expand Up @@ -158,6 +187,36 @@ protected void disableReplicatorRead() {
this.cursor.setInactive();
}

@Override
protected synchronized CompletableFuture<Void> openCursorAsync() {
log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster);
if (cursor != null) {
log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
if (expiryMonitor == null) {
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
}
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> res = new CompletableFuture<>();
ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
PersistentReplicator.this.cursor = cursor;
PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
res.complete(null);
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception);
res.completeExceptionally(new PersistenceException(exception));
}

}, null);
return res;
}


/**
* Calculate available permits for read entries.
Expand Down Expand Up @@ -601,7 +660,9 @@ public void updateRates() {
msgExpired.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
if (expiryMonitor != null) {
stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate();
}
}

public ReplicatorStats getStats() {
Expand Down Expand Up @@ -639,7 +700,9 @@ public void expireMessages(int messageTTLInSeconds) {
// don't do anything for almost caught-up connected subscriptions
return;
}
expiryMonitor.expireMessages(messageTTLInSeconds);
if (expiryMonitor != null) {
expiryMonitor.expireMessages(messageTTLInSeconds);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
Expand Down Expand Up @@ -1189,37 +1189,26 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(new PersistenceException(exception));
}

}, null);
String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}

return future;
}

protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster,
brokerService, ledger);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
Expand Down

0 comments on commit 336e971

Please sign in to comment.