Skip to content

Commit

Permalink
[pulsar-broker] Handle multiple topic creation for same topic-name in…
Browse files Browse the repository at this point in the history
… broker (apache#10847)

### Motivation

When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog.

```
19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic
:
9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger
:
19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382
:
19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
:
19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
:
19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
:
19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
:
07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic
```

### Modification
- Stopped replicator for failed and timed-out topic
- Clean up failed topic

### Result
- Successfully create replicator producer for the topic and avoid creating replication backlog
  • Loading branch information
rdhabalia authored Jun 18, 2021
1 parent 43f4e44 commit 1447e6b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor());

if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1233,8 +1233,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing the topic",
topic, FutureUtil.getException(topicFuture));
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -34,11 +36,13 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -52,6 +56,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -65,6 +70,7 @@
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down Expand Up @@ -1041,6 +1047,80 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
nonPersistentProducer2.close();
}

@Test
public void testCleanupTopic() throws Exception {

final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
final String namespace = "pulsar/ns-" + System.nanoTime();
final String topicName = "persistent://" + namespace + "/cleanTopic";
final String topicMlName = namespace + "/persistent/cleanTopic";
admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));

PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

long topicLoadTimeoutSeconds = 3;
config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);

ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory()
.getManagedLedgerFactory();
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
.get(mlFactory);
CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>();
ledgers.put(topicMlName, mlFuture);

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
fail("consumer should fail due to topic loading failure");
} catch (Exception e) {
// Ok
}

CompletableFuture<Optional<Topic>> topicFuture = null;
for (int i = 0; i < 5; i++) {
topicFuture = pulsar1.getBrokerService().getTopics().get(topicName);
if (topicFuture != null) {
break;
}
Thread.sleep(i * 1000);
}

try {
topicFuture.get();
fail("topic creation should fail");
} catch (Exception e) {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
fail("consumer should fail due to topic loading failure");
} catch (Exception e) {
// Ok
}

ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);

consumer.close();
}
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -162,4 +164,33 @@ public synchronized Throwable fillInStackTrace() {
return this;
}
}

public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay,
TimeUnit unit, Exception exp) {
CompletableFuture<T> future = new CompletableFuture<T>();
executor.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exp);
}
}, delay, unit);
return future;
}

public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) {
return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS,
new TimeoutException("Future didn't finish within deadline"));
}

public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
if (future != null && future.isCompletedExceptionally()) {
try {
future.get();
} catch (InterruptedException e) {
return Optional.ofNullable(e);
} catch (ExecutionException e) {
return Optional.ofNullable(e.getCause());
}
}
return Optional.empty();
}
}

0 comments on commit 1447e6b

Please sign in to comment.