Skip to content

Commit

Permalink
[Authorization] Role with namespace produce authz can also get topics (
Browse files Browse the repository at this point in the history
  • Loading branch information
yuruguo authored Mar 2, 2022
1 parent e6656e1 commit 89d60af
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,40 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
}

private CompletableFuture<Boolean> allowConsumeOrProduceOpsAsync(NamespaceName namespaceName,
String role,
AuthenticationDataSource authenticationData) {
CompletableFuture<Boolean> finalResult = new CompletableFuture<>();
allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume)
.whenComplete((consumeAuthorized, e) -> {
if (e == null) {
if (consumeAuthorized) {
finalResult.complete(consumeAuthorized);
return;
}
} else {
if (log.isDebugEnabled()) {
log.debug("Namespace [{}] Role [{}] exception occurred while trying to check Consume "
+ "permission. {}", namespaceName, role, e.getCause());
}
}
allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.produce)
.whenComplete((produceAuthorized, ex) -> {
if (ex == null) {
finalResult.complete(produceAuthorized);
} else {
if (log.isDebugEnabled()) {
log.debug("Namespace [{}] Role [{}] exception occurred while trying to check "
+ "Produce permission. {}", namespaceName, role, ex.getCause());
}
finalResult.completeExceptionally(ex.getCause());
}
});
});

return finalResult;
}

private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData,
AuthAction authAction) {
Expand Down Expand Up @@ -550,6 +584,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
namespaceName, role, authData, AuthAction.packages);
case GET_TOPIC:
case GET_TOPICS:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
case UNSUBSCRIBE:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ public void testClearBacklogPermission() throws Exception {
assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions()
.get(subscriptionName).getMsgBacklog(), 0);

superAdmin.namespaces().revokePermissionsOnNamespace(namespace, subscriptionRole);
superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
Sets.newHashSet(AuthAction.produce));
assertEquals(sub1Admin.topics().getPartitionedTopicList(namespace),
Lists.newArrayList(topicName));

log.info("-- Exiting {} test --", methodName);
}

Expand Down

0 comments on commit 89d60af

Please sign in to comment.