Skip to content

Commit

Permalink
Broker should not start replicator for root partitioned-topic (apache…
Browse files Browse the repository at this point in the history
…#1262)

* Broker should not start replicator for root partitioned-topic

* address comment
  • Loading branch information
rdhabalia authored Feb 23, 2018
1 parent 68cd115 commit f38a003
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.AbstractReplicator.State;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,8 +61,9 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster,
String remoteCluster, BrokerService brokerService) {
public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
Expand All @@ -69,8 +74,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerBuilder = client.newProducer() //
.topic(topicName)
.sendTimeout(0, TimeUnit.SECONDS) //
.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
Expand Down Expand Up @@ -211,5 +215,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
return (replicatorPrefix + "." + cluster).intern();
}

/**
* Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
*
* <pre>
* eg:
* if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
* broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
*
* However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual
* producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing
* replicator producers.
* </pre>
*
* Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
* producers.
*
* @param topicName
* @param brokerService
*/
private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
DestinationName destination = DestinationName.get(topicName);
String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
destination.getNamespace().toString(), destination.getDomain().toString(),
destination.getEncodedLocalName());
boolean isPartitionedTopic = false;
try {
isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(partitionedTopicPath).isPresent();
} catch (Exception e) {
log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
}
if (isPartitionedTopic) {
throw new NamingException(
topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
}
}

private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -583,29 +584,37 @@ private void createPersistentTopic(final String topic, CompletableFuture<Topic>
new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);

CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
replicationFuture.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
}).thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(destinationName, persistentTopic);
topicFuture.complete(persistentTopic);
}).exceptionally((ex) -> {
log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
try {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
BrokerService.this);
CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
replicationFuture.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
}).thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(destinationName, persistentTopic);
topicFuture.complete(persistentTopic);
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed. Removing topic from topics list {}, {}",
topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});

return null;
});

return null;
});
} catch (NamingException e) {
log.warn("Failed to create topic {}-{}", topic, e.getMessage());
pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -49,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) {
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerBuilder.blockIfQueueFull(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
Expand All @@ -49,6 +50,8 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
Expand Down Expand Up @@ -535,7 +538,12 @@ public CompletableFuture<Void> checkReplication() {
}

if (!replicators.containsKey(cluster)) {
startReplicator(cluster);
if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
}
}
}

Expand All @@ -550,13 +558,30 @@ public CompletableFuture<Void> checkReplication() {
return FutureUtil.waitForAll(futures);
}

void startReplicator(String remoteCluster) {
boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster,
r -> new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService));
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
}

protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -941,5 +966,7 @@ public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}



private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final ReplicatorStats stats = new ReplicatorStats();

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) {
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void reset() {
}
}

public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
this.topic = topic;
this.ledger = ledger;
this.brokerService = brokerService;
Expand All @@ -213,8 +213,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
replicators.put(remoteCluster,
new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService));
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
}
} else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
// This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
// to take care of it
Expand Down Expand Up @@ -896,9 +899,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService));
future.complete(null);
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
}

@Override
Expand All @@ -911,6 +918,26 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit f38a003

Please sign in to comment.