Skip to content

Commit

Permalink
Optimize TopicPolicies#replicationClusters with HierarchyTopicPolicies (
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Dec 28, 2021
1 parent 892948a commit ef42af2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -151,6 +152,10 @@ public AbstractTopic(String topic, BrokerService brokerService) {
}

protected void updateTopicPolicy(TopicPolicies data) {
if (!isSystemTopic()) {
// Only use namespace level setting for system topic.
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters());
}
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
Expand All @@ -173,6 +178,8 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -46,7 +47,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
Expand Down Expand Up @@ -528,50 +528,32 @@ public CompletableFuture<Void> checkReplication() {
log.debug("[{}] Checking replication status", name);
}

return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return FutureUtil.failedFuture(
new ServerMetadataException(
new MetadataStoreException.NotFoundException()));
}

Policies policies = optPolicies.get();
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = policies.replication_clusters;
} else {
configuredClusters = Collections.emptySet();
}
Set<String> configuredClusters = new HashSet<>(topicPolicies.getReplicationClusters().get());

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

List<CompletableFuture<Void>> futures = Lists.newArrayList();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
});
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}

if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
}

CompletableFuture<Void> startReplicator(String remoteCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
Expand Down Expand Up @@ -1377,82 +1376,43 @@ public CompletableFuture<Void> checkReplication() {
log.debug("[{}] Checking replication status", name);
}

CompletableFuture<List<String>> replicationClustersFuture = getReplicationClusters(name);
List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();

return CompletableFuture.allOf(replicationClustersFuture)
.thenCompose(__ -> {
List<String> configuredClusters = replicationClustersFuture.join();
int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

List<CompletableFuture<Void>> futures = Lists.newArrayList();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}

if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);

if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

});
List<CompletableFuture<Void>> futures = Lists.newArrayList();

return FutureUtil.waitForAll(futures);
});
}
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

@VisibleForTesting
public CompletableFuture<List<String>> getReplicationClusters(TopicName topicName) {
CompletableFuture<Optional<TopicPolicies>> future = new CompletableFuture<>();
if (isSystemTopic()) {
//Topic Policies System Topic have to skip the topic policy.
future.complete(Optional.empty());
} else {
future = brokerService.pulsar().getTopicPoliciesService()
.getTopicPoliciesAsyncWithRetry(topicName, null, brokerService.pulsar().getExecutor(), false);
}
return future.thenCompose(topicPolicies -> {
if (!topicPolicies.isPresent() || topicPolicies.get().getReplicationClusters() == null) {
return brokerService.pulsar().getPulsarResources()
.getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return FutureUtil.failedFuture(
new ServerMetadataException(
new MetadataStoreException.NotFoundException()));
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});

return CompletableFuture.completedFuture(
Lists.newArrayList(optPolicies.get().replication_clusters));
});
} else {
return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters());
}
});
return FutureUtil.waitForAll(futures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
Expand All @@ -41,7 +40,15 @@
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
Expand All @@ -66,22 +73,16 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -145,12 +146,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

@Test(groups = "broker")
public class PersistentTopicTest extends MockedBookKeeperTestCase {
protected PulsarService pulsar;
Expand Down Expand Up @@ -2259,14 +2254,8 @@ private ByteBuf getMessageWithMetadata(byte[] data) {
@Test
public void testGetReplicationClusters() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
TopicName name = TopicName.get(successTopicName);
CompletableFuture<List<String>> replicationClusters = topic.getReplicationClusters(name);
try {
replicationClusters.get();
fail("Should have failed");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof BrokerServiceException.ServerMetadataException);
}
topic.initialize();
assertNull(topic.getHierarchyTopicPolicies().getReplicationClusters().get());

PulsarResources pulsarResources = spy(new PulsarResources(store, store));
NamespaceResources nsr = spy(new NamespaceResources(store, store, 30));
Expand All @@ -2282,9 +2271,8 @@ public void testGetReplicationClusters() throws Exception {
doReturn(policiesFuture).when(nsr).getPoliciesAsync(any());

topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
replicationClusters = topic.getReplicationClusters(name);

assertEquals(replicationClusters.get(), namespaceClusters);
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), namespaceClusters);

TopicPoliciesService topicPoliciesService = mock(TopicPoliciesService.class);
doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService();
Expand All @@ -2295,11 +2283,10 @@ public void testGetReplicationClusters() throws Exception {
topicPolicies.setReplicationClusters(topicClusters);
Optional<TopicPolicies> optionalTopicPolicies = Optional.of(topicPolicies);
topicPoliciesFuture.complete(optionalTopicPolicies);
when(topicPoliciesService.getTopicPoliciesAsyncWithRetry(any(), any(), any(), anyBoolean())).thenReturn(topicPoliciesFuture);
when(topicPoliciesService.getTopicPoliciesIfExists(any())).thenReturn(topicPolicies);

topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
replicationClusters = topic.getReplicationClusters(name);

assertEquals(replicationClusters.get(), topicClusters);
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), namespaceClusters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand All @@ -31,6 +32,7 @@
*/
@Getter
public class HierarchyTopicPolicies {
final PolicyHierarchyValue<List<String>> replicationClusters;
final PolicyHierarchyValue<Boolean> deduplicationEnabled;
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<EnumSet<SubType>> subscriptionTypesEnabled;
Expand All @@ -42,6 +44,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
subscriptionTypesEnabled = new PolicyHierarchyValue<>();
Expand Down

0 comments on commit ef42af2

Please sign in to comment.