Skip to content

Commit

Permalink
Differentiate authorization between source/sink/function operations (a…
Browse files Browse the repository at this point in the history
…pache#7466)

* Differentiate between source/sink/function operations

* Added release notes

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jul 7, 2020
1 parent ee1b810 commit 77dccd2
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
* Allow all source operations with in this namespace
* @param namespaceName The namespace that the sources operations can be executed in
* @param role The role to check
* @param authenticationData authentication data related to the role
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
* Allow all sink operations with in this namespace
* @param namespaceName The namespace that the sink operations can be executed in
* @param role The role to check
* @param authenticationData authentication data related to the role
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceN
return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
}

public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return provider.allowSourceOpsAsync(namespaceName, role, authenticationData);
}

public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
}

/**
* Grant authorization-action permission on a tenant to the given client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol

@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.functions);
}

@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sources);
}

@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return allowFunctionSourceSinkOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
}

private CompletableFuture<Boolean> allowFunctionSourceSinkOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData,
AuthAction authAction) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
Expand All @@ -231,15 +247,15 @@ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceN
} else {
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(AuthAction.functions)) {
if (namespaceActions != null && namespaceActions.contains(authAction)) {
// The role has namespace level permission
permissionFuture.complete(true);
return;
}

// Using wildcard
if (conf.isAuthorizationAllowWildcardsMatching()) {
if (checkWildcardPermission(role, AuthAction.functions, namespaceRoles)) {
if (checkWildcardPermission(role, authAction, namespaceRoles)) {
// The role has namespace level permission by wildcard match
permissionFuture.complete(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceN
return null;
}

@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}

@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public enum AuthAction {

/** Permissions for functions ops. **/
functions,

/** Permissions for sources ops. **/
sources,

/** Permissions for sinks ops. **/
sinks,
}
Original file line number Diff line number Diff line change
Expand Up @@ -1518,8 +1518,18 @@ && worker().getWorkerConfig().getSuperUserRoles() != null
public boolean allowFunctionOps(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
try {
return worker().getAuthorizationService().allowFunctionOpsAsync(
namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
switch (componentType) {
case SINK:
return worker().getAuthorizationService().allowSinkOpsAsync(
namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
case SOURCE:
return worker().getAuthorizationService().allowSourceOpsAsync(
namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
case FUNCTION:
default:
return worker().getAuthorizationService().allowFunctionOpsAsync(
namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking function authorization on {} ", worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), namespaceName);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
Expand Down
6 changes: 6 additions & 0 deletions site2/website/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

## Apache Pulsar Release Notes

### 2.7.0 &mdash; Not Yet Released <a id=“2.7.0”></a>

##### Upgrade notes

* [IO] If Function Authorization is enabled, users have to be given the source/sink entitlement to run them. See https://github.com/apache/pulsar/pull/7466

### 2.6.0 &mdash; 2020-06-17 <a id=“2.6.0”></a>

#### Features
Expand Down

0 comments on commit 77dccd2

Please sign in to comment.