Skip to content

Commit

Permalink
[revert] "[fix][broker] Fix tenant admin authorization bug. (apache#2…
Browse files Browse the repository at this point in the history
…0068)" (apache#20143)

This reverts commit fc17c1d.

### Motivation

In apache#20068 we changed the way that the `AuthorizationService` is implemented. I think this approach could have unintended consequences. Instead, I think we should change the `Consumer` and the `Producer` logic to call the correct `AuthorizationService` method. I propose an update to the `Consumer` and `Producer` logic here apache#20142.

Given that our goal is to deprecate the `AuthorizationService` methods for `canProduce` and `canConsume`, I think we should not change their implementations.

### Modifications

* Revert apache#20068

### Verifying this change

This change is trivial. It removes certain test changes that were only made to make the previous PR work.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: Skipping PR as I ran tests locally.
  • Loading branch information
michaeljmarshall authored Apr 20, 2023
1 parent dc5e497 commit 00dc7a0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -183,7 +182,13 @@ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String ro
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData);
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canProduceAsync(topicName, role, authenticationData);
}
});
}

/**
Expand All @@ -202,9 +207,13 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}

return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME,
new AuthenticationDataSubscription(authenticationData, subscription));
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canConsumeAsync(topicName, role, authenticationData, subscription);
}
});
}

public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData)
Expand Down Expand Up @@ -280,7 +289,13 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData);
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canLookupAsync(topicName, role, authenticationData);
}
});
}

public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ public void cleanup() throws Exception {
public void simple() throws Exception {
AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService();

try {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
fail("Should throw exception when tenant not exist");
} catch (Exception ignored) {}
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.clusters().createCluster("c1", ClusterData.builder().build());
String tenantAdmin = "role1";
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
Expand Down Expand Up @@ -218,22 +215,21 @@ public void simple() throws Exception {
SubscriptionAuthMode.Prefix);
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null));
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null));
// tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1"));
try {
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1"));
fail();
} catch (Exception ignored) {}
try {
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2"));
fail();
} catch (Exception ignored) {}

assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1"));

// tenant admin can produce all topics
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,22 +1035,6 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
return CompletableFuture.completedFuture(grantRoles.contains(role));
}

@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role,
TopicOperation operation,
AuthenticationDataSource authData) {
switch (operation) {

case PRODUCE:
return canProduceAsync(topic, role, authData);
case CONSUME:
return canConsumeAsync(topic, role, authData, authData.getSubscription());
case LOOKUP:
return canLookupAsync(topic, role, authData);
}
return super.allowTopicOperationAsync(topic, role, operation, authData);
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.EnumSet;
import java.util.Optional;
Expand Down Expand Up @@ -84,13 +83,10 @@ protected void cleanup() throws Exception {
public void test() throws Exception {
AuthorizationService auth = service.getAuthorizationService();

try {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
fail("Should throw exception when tenant not exist");
} catch (Exception ignored) {}
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
String tenantAdmin = "role1";
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
Expand Down Expand Up @@ -121,10 +117,6 @@ public void test() throws Exception {
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));

// tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1"));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
Expand Down

0 comments on commit 00dc7a0

Please sign in to comment.