Skip to content

Commit

Permalink
Add authorization support on function apis (apache#2213)
Browse files Browse the repository at this point in the history
* Add authorization support on function apis

* fix authorization enable check
  • Loading branch information
rdhabalia authored Jul 24, 2018
1 parent 2543ccf commit ce6fe8b
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 29 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
metricsSamplingPeriodSec: 60
# Enforce authentication
authenticationEnabled: false
# Enforce authorization on accessing functions api
authorizationEnabled: false
# Set of autentication provider name list, which is a list of class names
authenticationProviders:
# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api
superUserRoles:
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionDetails") String functionDetailsJson) {

return functions.registerFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson);
return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, clientAppId());
}

@PUT
Expand All @@ -103,8 +103,8 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("functionDetails") String functionDetailsJson) {

return functions.updateFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson);
return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, clientAppId());

}

Expand All @@ -122,8 +122,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
return functions.deregisterFunction(
tenant, namespace, functionName);
return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Maps;
Expand All @@ -99,7 +100,6 @@ public class PulsarSinkE2ETest {
PulsarAdmin admin;
PulsarClient pulsarClient;
BrokerStats brokerStatsClient;
WorkerServer functionsWorkerServer;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
Expand All @@ -120,6 +120,11 @@ public class PulsarSinkE2ETest {

private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class);

@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@BeforeMethod
void setup(Method method) throws Exception {

Expand Down Expand Up @@ -187,6 +192,7 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
pulsarClient = clientBuilder.build();

TenantInfo propAdmin = new TenantInfo();
propAdmin.getAdminRoles().add("superUser");
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);

Expand Down Expand Up @@ -231,6 +237,9 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);

workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

return new WorkerService(workerConfig);
}
Expand Down Expand Up @@ -416,4 +425,34 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String

return functionDetailsBuilder.build();
}

@Test(dataProvider = "validRoleName")
public void testAuthorization(boolean validRoleName) throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarSink-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

String roleName = validRoleName ? "superUser" : "invalid";
TenantInfo propAdmin = new TenantInfo();
propAdmin.getAdminRoles().add(roleName);
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);

String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic, subscriptionName);
try {
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
Assert.assertTrue(validRoleName);
} catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) {
Assert.assertFalse(validRoleName);
}
}
}
6 changes: 6 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.Sets;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.configuration.PulsarConfiguration;

import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -42,7 +46,7 @@
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
public class WorkerConfig implements Serializable {
public class WorkerConfig implements Serializable, PulsarConfiguration {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -74,6 +78,17 @@ public class WorkerConfig implements Serializable {
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
private int metricsSamplingPeriodSec = 60;
// Enforce authentication
private boolean authenticationEnabled = false;
// Autentication provider name list, which is a list of class names
private Set<String> authenticationProviders = Sets.newTreeSet();
// Enforce authorization on accessing functions admin-api
private boolean authorizationEnabled = false;
// Role names that are treated as "super-user", meaning they will be able to access any admin-api
private Set<String> superUserRoles = Sets.newTreeSet();

private Properties properties = new Properties();


@Data
@Setter
Expand Down Expand Up @@ -135,4 +150,9 @@ public static String unsafeLocalhostResolve() {
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
}

@Override
public void setProperties(Properties properties) {
this.properties = properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;

/**
* A service component contains everything to run a worker except rest server.
Expand All @@ -57,8 +60,9 @@ public class WorkerService {
private SchedulerManager schedulerManager;
private boolean isInitialized = false;
private final ScheduledExecutorService statsUpdater;

private AuthenticationService authenticationService;
private ConnectorsManager connectorsManager;
private PulsarAdmin admin;

public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
Expand All @@ -68,6 +72,11 @@ public WorkerService(WorkerConfig workerConfig) {

public void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());

this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());

try {
log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(workerConfig));
Expand Down Expand Up @@ -128,6 +137,8 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {

// initialize function metadata manager
this.functionMetaDataManager.initialize();

authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));

// Starting cluster services
log.info("Start cluster services...");
Expand Down Expand Up @@ -200,6 +211,10 @@ public void stop() {
if (null != schedulerManager) {
schedulerManager.close();
}

if (null != this.admin) {
this.admin.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pulsar.functions.worker.rest;

import java.util.Optional;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;

import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;

Expand All @@ -32,6 +36,8 @@ public class FunctionApiResource implements Supplier<WorkerService> {
private WorkerService workerService;
@Context
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;

public FunctionApiResource() {
this.functions = new FunctionsImpl(this);
Expand All @@ -44,4 +50,10 @@ public synchronized WorkerService get() {
}
return this.workerService;
}

public String clientAppId() {
return httpRequest != null
? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@
package org.apache.pulsar.functions.worker.rest;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;

import javax.servlet.DispatcherType;

import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;

import java.net.BindException;
import java.net.URI;

import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.server.ResourceConfig;
Expand All @@ -41,6 +50,7 @@ public class WorkerServer implements Runnable {

private final WorkerConfig workerConfig;
private final WorkerService workerService;
private static final String MATCH_ALL = "/*";

private static String getErrorMessage(Server server, int port, Exception ex) {
if (ex instanceof BindException) {
Expand Down Expand Up @@ -106,7 +116,12 @@ public static ServletContextHandler newServletContextHandler(String contextPath,
final ServletHolder apiServlet =
new ServletHolder(new ServletContainer(config));
contextHandler.addServlet(apiServlet, "/*");
if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(workerService.getAuthenticationService()));
contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

return contextHandler;
}

}
Loading

0 comments on commit ce6fe8b

Please sign in to comment.