Skip to content

Commit

Permalink
Broker should not start replicator for root partitioned-topic (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored May 2, 2018
1 parent 5484585 commit ca7559b
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,8 +62,9 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster,
String remoteCluster, BrokerService brokerService) {
public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
Expand All @@ -67,7 +73,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerConfiguration = new ProducerConfiguration();
this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
Expand Down Expand Up @@ -214,5 +219,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
return (replicatorPrefix + "." + cluster).intern();
}

/**
* Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
*
* <pre>
* eg:
* if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
* broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
*
* However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual
* producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing
* replicator producers.
* </pre>
*
* Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
* producers.
*
* @param topicName
* @param brokerService
*/
private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
DestinationName destination = DestinationName.get(topicName);
String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
destination.getNamespace().toString(), destination.getDomain().toString(),
destination.getEncodedLocalName());
boolean isPartitionedTopic = false;
try {
isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(partitionedTopicPath).isPresent();
} catch (Exception e) {
log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
}
if (isPartitionedTopic) {
throw new NamingException(
topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
}
}

private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -584,29 +585,37 @@ private void createPersistentTopic(final String topic, CompletableFuture<Topic>
new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);

CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
replicationFuture.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
}).thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(destinationName, persistentTopic);
topicFuture.complete(persistentTopic);
}).exceptionally((ex) -> {
log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
try {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
BrokerService.this);
CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
replicationFuture.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
}).thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(destinationName, persistentTopic);
topicFuture.complete(persistentTopic);
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed. Removing topic from topics list {}, {}",
topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});

return null;
});

return null;
});
} catch (NamingException e) {
log.warn("Failed to create topic {}-{}", topic, e.getMessage());
pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -49,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) {
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerConfiguration.setBlockIfQueueFull(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,6 @@ private boolean hasLocalProducers() {
return foundLocal.get();
}

private boolean hasRemoteProducers() {
AtomicBoolean foundRemote = new AtomicBoolean(false);
producers.forEach(producer -> {
if (producer.isRemote()) {
foundRemote.set(true);
}
});

return foundRemote.get();
}

@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
Expand Down Expand Up @@ -524,7 +513,12 @@ public CompletableFuture<Void> checkReplication() {
}

if (!replicators.containsKey(cluster)) {
startReplicator(cluster);
if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
}
}
}

Expand All @@ -539,13 +533,30 @@ public CompletableFuture<Void> checkReplication() {
return FutureUtil.waitForAll(futures);
}

void startReplicator(String remoteCluster) {
boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster,
r -> new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService));
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
}

protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -917,6 +928,8 @@ public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}



private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final ReplicatorStats stats = new ReplicatorStats();

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) {
BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void reset() {
}
}

public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
this.topic = topic;
this.ledger = ledger;
this.brokerService = brokerService;
Expand All @@ -212,8 +212,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
replicators.put(remoteCluster,
new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService));
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
}
} else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
// This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
// to take care of it
Expand Down Expand Up @@ -882,9 +885,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService));
future.complete(null);
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
}

@Override
Expand All @@ -897,6 +904,26 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit ca7559b

Please sign in to comment.