Skip to content

Commit

Permalink
[improve][broker] Make some method on permission async (apache#16324)
Browse files Browse the repository at this point in the history
Co-authored-by: gavingaozhangmin <[email protected]>
  • Loading branch information
gaozhangmin and gavingaozhangmin authored Jul 12, 2022
1 parent b4ef4a3 commit b30c53e
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -701,97 +700,109 @@ protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boole
}
}

protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(role, "Role should not be null");
checkNotNull(actions, "Actions should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
authService.grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/)
.get();
} else {
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
}
} catch (InterruptedException e) {
log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
} catch (ExecutionException e) {
// The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (e.getCause() instanceof MetadataStoreException.NotFoundException
|| e.getCause() instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(),
namespaceName, e);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else if (e.getCause() instanceof MetadataStoreException.BadVersionException
|| e.getCause() instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for namespace {}: {}",
clientAppId(), namespaceName, e.getCause().getMessage(), e);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION)
.thenAccept(__ -> {
checkNotNull(role, "Role should not be null");
checkNotNull(actions, "Actions should not be null");
}).thenCompose(__ ->
authService.grantPermissionAsync(namespaceName, actions, role, null))
.thenAccept(unused ->
log.info("[{}] Successfully granted access for role {}: {} - namespaceName {}",
clientAppId(), role, actions, namespaceName))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(),
namespaceName, ex);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for namespace {}: {}",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, ex);
throw new RestException(realCause);
}
});
} else {
String msg = "Authorization is not enabled";
return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}


protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(roles, "Roles should not be null");

try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
authService.grantSubscriptionPermissionAsync(namespaceName, subscription, roles,
null/* additional auth-data json */).get();
} else {
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
}
} catch (InterruptedException e) {
log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else if (e.getCause() instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification", clientAppId(),
namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalGrantPermissionOnSubscriptionAsync(String subscription,
Set<String> roles) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION)
.thenAccept(__ -> {
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(roles, "Roles should not be null");
})
.thenCompose(__ -> authService.grantSubscriptionPermissionAsync(namespaceName, subscription,
roles, null))
.thenAccept(unused -> {
log.info("[{}] Successfully granted permssion on subscription for role {}:{} - "
+ "namespaceName {}", clientAppId(), roles, subscription, namespaceName);
})
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause.getCause() instanceof IllegalArgumentException) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else if (realCause.getCause() instanceof IllegalStateException) {
log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, realCause);
throw new RestException(realCause);
}
});
} else {
String msg = "Authorization is not enabled";
return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}

protected void internalRevokePermissionsOnNamespace(String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(role, "Role should not be null");
updatePolicies(namespaceName, policies ->{
policies.auth_policies.getNamespaceAuthentication().remove(role);
return policies;
});
protected CompletableFuture<Void> internalRevokePermissionsOnNamespaceAsync(String role) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION)
.thenAccept(__ -> checkNotNull(role, "Role should not be null"))
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.auth_policies.getNamespaceAuthentication().remove(role);
return policies;
}));
}

protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
checkNotNull(subscriptionName, "SubscriptionName should not be null");
checkNotNull(role, "Role should not be null");

protected CompletableFuture<Void> internalRevokePermissionsOnSubscriptionAsync(String subscriptionName,
String role) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
authService.revokeSubscriptionPermissionAsync(namespaceName, subscriptionName, role,
null/* additional auth-data json */);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION)
.thenAccept(__ -> {
checkNotNull(subscriptionName, "SubscriptionName should not be null");
checkNotNull(role, "Role should not be null");
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> authService.revokeSubscriptionPermissionAsync(namespaceName,
subscriptionName, role, null/* additional auth-data json */));
} else {
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
String msg = "Authorization is not enabled";
return FutureUtil.failedFuture(new RestException(Status.NOT_IMPLEMENTED, msg));
}
}

Expand Down
Loading

0 comments on commit b30c53e

Please sign in to comment.