Skip to content

Commit

Permalink
[ISSUE 3763] - Implementing Function authorization (apache#3874)
Browse files Browse the repository at this point in the history
* Implementing Function Authorization
  • Loading branch information
jerrypeng authored Mar 28, 2019
1 parent bda6a9c commit 14d1eaa
Show file tree
Hide file tree
Showing 30 changed files with 1,091 additions and 179 deletions.
13 changes: 11 additions & 2 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751

# Configuration Store connection string
configurationStoreServers: localhost:2181
# ZooKeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis: 30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds: 30

################################
# Function package management
################################
Expand Down Expand Up @@ -129,8 +136,10 @@ processContainerFactory:
authenticationEnabled: false
# Enforce authorization on accessing functions api
authorizationEnabled: false
# Set of autentication provider name list, which is a list of class names
authenticationProviders:
# Set of authentication provider name list, which is a list of class names
authenticationProviders:
# Authorization provider fully qualified class-name
authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api
superUserRoles:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);

/**
* Allow all function operations with in this namespace
* @param namespaceName The namespace that the function operations can be executed in
* @param role The role to check
* @param authenticationData authentication data related to the role
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -307,4 +305,8 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
return finalResult;
}

public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,48 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
return finalResult;
}

@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
}
} else {
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(AuthAction.functions)) {
// The role has namespace level permission
permissionFuture.complete(true);
return;
}

// Using wildcard
if (conf.isAuthorizationAllowWildcardsMatching()) {
if (checkWildcardPermission(role, AuthAction.functions, namespaceRoles)) {
// The role has namespace level permission by wildcard match
permissionFuture.complete(true);
return;
}
}
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
}

@Override
public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions,
String role, String authDataJson) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ private static class BrokerStarter {
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
// inherit broker authorization setting
workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
functionsWorkerService = new WorkerService(workerConfig);
} else {
functionsWorkerService = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ public void start() throws Exception {
"c-" + config.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
// inherit broker authorization setting
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
workerConfig.setConfigurationStoreServers(config.getConfigurationStoreServers());
workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());

fnWorkerService = new WorkerService(workerConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
Expand Down Expand Up @@ -438,7 +440,7 @@ public Boolean get() {
acquireSLANamespace();

// start function worker service if necessary
this.startWorkerService();
this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());

LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
Expand Down Expand Up @@ -945,7 +947,9 @@ public SchemaRegistryService getSchemaRegistryService() {
return schemaRegistryService;
}

private void startWorkerService() throws InterruptedException, IOException, KeeperException {
private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
if (functionWorkerService.isPresent()) {
LOG.info("Starting function worker service");
String namespace = functionWorkerService.get()
Expand Down Expand Up @@ -1041,7 +1045,7 @@ private void startWorkerService() throws InterruptedException, IOException, Keep
throw ioe;
}
LOG.info("Function worker service setup completed");
functionWorkerService.get().start(dlogURI);
functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
LOG.info("Function worker service started");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, null, functionConfigJson, clientAppId());
functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}


Expand All @@ -120,7 +120,7 @@ public void updateFunction(final @PathParam("tenant") String tenant,
public void deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
functions.deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@GET
Expand All @@ -138,7 +138,7 @@ public void deregisterFunction(final @PathParam("tenant") String tenant,
public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(tenant, namespace, functionName);
return functions.getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@GET
Expand All @@ -158,7 +158,7 @@ public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunct
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@GET
Expand All @@ -177,7 +177,7 @@ public FunctionStatus getFunctionStatus(
final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri());
return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@GET
Expand All @@ -195,7 +195,7 @@ public FunctionStatus getFunctionStatus(
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@GET
Expand All @@ -215,7 +215,7 @@ public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunction
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri());
return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@GET
Expand All @@ -231,7 +231,7 @@ public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunction
@Path("/{tenant}/{namespace}")
public List<String> listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
return functions.listFunctions(tenant, namespace);
return functions.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}

@POST
Expand All @@ -253,7 +253,7 @@ public String triggerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("data") String triggerValue,
final @FormDataParam("dataStream") InputStream triggerStream,
final @FormDataParam("topic") String topic) {
return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
}

@GET
Expand All @@ -272,7 +272,7 @@ public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key) {
return functions.getFunctionState(tenant, namespace, functionName, key);
return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
}

@POST
Expand All @@ -288,7 +288,7 @@ public void restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@POST
Expand All @@ -303,7 +303,7 @@ public void restartFunction(final @PathParam("tenant") String tenant,
public void restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
functions.restartFunctionInstances(tenant, namespace, functionName);
functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@POST
Expand All @@ -319,7 +319,7 @@ public void stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@POST
Expand All @@ -334,7 +334,7 @@ public void stopFunction(final @PathParam("tenant") String tenant,
public void stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
functions.stopFunctionInstances(tenant, namespace, functionName);
functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@POST
Expand All @@ -350,7 +350,7 @@ public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}

@POST
Expand All @@ -365,7 +365,7 @@ public void startFunction(final @PathParam("tenant") String tenant,
public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
functions.startFunctionInstances(tenant, namespace, functionName);
functions.startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}

@POST
Expand Down
Loading

0 comments on commit 14d1eaa

Please sign in to comment.