Skip to content

Commit

Permalink
[pulsar-broker] Handle lookup redirect for V1-topics with different c…
Browse files Browse the repository at this point in the history
…luster (apache#12743)
  • Loading branch information
rdhabalia authored Nov 11, 2021
1 parent ba97edf commit 25cbfad
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,23 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
}

private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
return checkPermission(topicName, role, action)
.thenApply(isPermission -> isPermission && checkCluster(topicName));
return checkPermission(topicName, role, action).
thenApply(isPermission -> isPermission).
thenCompose(permission -> permission ? checkCluster(topicName) :
CompletableFuture.completedFuture(false));
}

private boolean checkCluster(TopicName topicName) {
private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
if (topicName.isGlobal() || conf.getClusterName().equals(topicName.getCluster())) {
return true;
} else {
if (log.isDebugEnabled()) {
log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(),
conf.getClusterName());
}
return false;
return CompletableFuture.completedFuture(true);
}
if (log.isDebugEnabled()) {
log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), conf.getClusterName());
}
return pulsarResources.getClusterResources().listAsync()
.thenApply(clusters -> {
return clusters.contains(topicName.getCluster());
});
}

public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public ClusterResources(MetadataStore store, int operationTimeoutSec) {
this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
}

public CompletableFuture<Set<String>> listAsync() {
return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list));
}

public Set<String> list() throws MetadataStoreException {
return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,27 @@ public void testDoNotReplicateSystemTopic() throws Exception {
});
}

@Test
public void testLookupAnotherCluster() throws Exception {
log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---");

String namespace = "pulsar/r2/cross-cluster-ns";
admin1.namespaces().createNamespace(namespace);
final TopicName topicName = TopicName
.get("persistent://" + namespace + "/topic");

@Cleanup
PulsarClient client1 = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

producer.close();
}

private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
// wait non-partitioned topics replicators created finished
final List<String> list = new ArrayList<>();
Expand Down

0 comments on commit 25cbfad

Please sign in to comment.