Skip to content

Commit

Permalink
Make Function Authentication Provider pluggable (apache#5404)
Browse files Browse the repository at this point in the history
* Make Function Authentication Provider pluaggable

* add config comments

* cleaning up

* cleaning up

* fix test
  • Loading branch information
jerrypeng authored and merlimat committed Oct 23, 2019
1 parent 7b0b550 commit 8b7245c
Show file tree
Hide file tree
Showing 20 changed files with 377 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
Expand Down Expand Up @@ -345,7 +346,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
null, /* python instance file */
null, /* log directory */
null, /* extra dependencies dir */
new DefaultSecretsProviderConfigurator(), false)) {
new DefaultSecretsProviderConfigurator(), false, Optional.empty())) {

for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.utils.Reflections;

import java.util.Optional;

Expand Down Expand Up @@ -61,4 +62,8 @@ public interface FunctionAuthProvider {
* @throws Exception
*/
void cleanUpAuthData(String tenant, String namespace, String name, Optional<FunctionAuthData> functionAuthData) throws Exception;

static FunctionAuthProvider getAuthProvider(String className) {
return Reflections.createInstance(className, FunctionAuthProvider.class, Thread.currentThread().getContextClassLoader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.functions.auth;

import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;
import org.apache.pulsar.functions.utils.Reflections;

import java.util.Optional;

Expand All @@ -28,10 +30,16 @@
*/
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {

void initialize(CoreV1Api coreClient, String kubeNamespace);

/**
* Configure function statefulset spec based on function auth data
* @param statefulSet statefulset spec for function
* @param functionAuthData function auth data
*/
void configureAuthDataStatefulSet(V1StatefulSet statefulSet, Optional<FunctionAuthData> functionAuthData);

static KubernetesFunctionAuthProvider getAuthProvider(String className) {
return Reflections.createInstance(className, KubernetesFunctionAuthProvider.class, Thread.currentThread().getContextClassLoader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
private static final String DEFAULT_SECRET_MOUNT_DIR = "/etc/auth";
private static final String FUNCTION_AUTH_TOKEN = "token";

private CoreV1Api coreClient;
private String kubeNamespace;

private final CoreV1Api coreClient;
private final String kubeNamespace;

public KubernetesSecretsTokenAuthProvider(CoreV1Api coreClient, String kubeNamespace) {
@Override
public void initialize(CoreV1Api coreClient, String kubeNamespace) {
this.coreClient = coreClient;
this.kubeNamespace = kubeNamespace;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public class KubernetesRuntime implements Runtime {
private int percentMemoryPadding;
private double cpuOverCommitRatio;
private double memoryOverCommitRatio;
private final KubernetesFunctionAuthProvider functionAuthDataCacheProvider;
private final Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider;
private final AuthenticationConfig authConfig;

KubernetesRuntime(AppsV1Api appsClient,
Expand Down Expand Up @@ -167,7 +167,7 @@ public class KubernetesRuntime implements Runtime {
int percentMemoryPadding,
double cpuOverCommitRatio,
double memoryOverCommitRatio,
KubernetesFunctionAuthProvider functionAuthDataCacheProvider,
Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider,
boolean authenticationEnabled) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
Expand Down Expand Up @@ -463,8 +463,8 @@ private void submitStatefulSet() throws Exception {
final V1StatefulSet statefulSet = createStatefulSet();
// Configure function authentication if needed
if (authenticationEnabled) {
functionAuthDataCacheProvider.configureAuthDataStatefulSet(
statefulSet, Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec()))));
functionAuthDataCacheProvider.ifPresent(kubernetesFunctionAuthProvider -> kubernetesFunctionAuthProvider.configureAuthDataStatefulSet(
statefulSet, Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec())))));
}

log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
Expand Down Expand Up @@ -96,6 +96,7 @@ class KubernetesInfo {
private CoreV1Api coreClient;
private Resources functionInstanceMinResources;
private final boolean authenticationEnabled;
private Optional<KubernetesFunctionAuthProvider> authProvider;

@VisibleForTesting
public KubernetesRuntimeFactory(String k8Uri,
Expand All @@ -121,7 +122,8 @@ public KubernetesRuntimeFactory(String k8Uri,
String changeConfigMapNamespace,
Resources functionInstanceMinResources,
SecretsProviderConfigurator secretsProviderConfigurator,
boolean authenticationEnabled) {
boolean authenticationEnabled,
Optional<FunctionAuthProvider> functionAuthProvider) {
this.kubernetesInfo = new KubernetesInfo();
this.kubernetesInfo.setK8Uri(k8Uri);
if (!isEmpty(jobNamespace)) {
Expand Down Expand Up @@ -181,6 +183,21 @@ public KubernetesRuntimeFactory(String k8Uri,
log.error("Failed to setup client", e);
throw new RuntimeException(e);
}

if (functionAuthProvider.isPresent()) {
if (!(functionAuthProvider.get() instanceof KubernetesFunctionAuthProvider)) {
throw new IllegalArgumentException("Function authentication provider "
+ functionAuthProvider.get().getClass().getName() + " must implement KubernetesFunctionAuthProvider");
} else {
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
kubernetesFunctionAuthProvider.initialize(coreClient, jobNamespace);
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
}
} else {
this.authProvider = Optional.empty();
}


}

@Override
Expand All @@ -207,9 +224,11 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
}

// adjust the auth config to support auth
if (authenticationEnabled) {
getAuthProvider().configureAuthenticationConfig(authConfig,
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec()))));
if (authenticationEnabled ) {
authProvider.ifPresent(kubernetesFunctionAuthProvider ->
kubernetesFunctionAuthProvider.configureAuthenticationConfig(authConfig,
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec())))));

}

return new KubernetesRuntime(
Expand Down Expand Up @@ -238,7 +257,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
this.kubernetesInfo.getPercentMemoryPadding(),
this.kubernetesInfo.getCpuOverCommitRatio(),
this.kubernetesInfo.getMemoryOverCommitRatio(),
getAuthProvider(),
authProvider,
authenticationEnabled);
}

Expand Down Expand Up @@ -341,7 +360,7 @@ void validateMinResourcesRequired(Function.FunctionDetails functionDetails) {
}

@Override
public KubernetesFunctionAuthProvider getAuthProvider() {
return new KubernetesSecretsTokenAuthProvider(coreClient, kubernetesInfo.jobNamespace);
public Optional<FunctionAuthProvider> getAuthProvider() {
return Optional.ofNullable(authProvider.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;

import java.nio.file.Paths;
import java.util.Optional;
import java.util.function.Consumer;

import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;

Expand All @@ -47,6 +50,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
private String pythonInstanceFile;
private String logDirectory;
private String extraDependenciesDir;
private Optional<FunctionAuthProvider> authProvider;

@VisibleForTesting
public ProcessRuntimeFactory(String pulsarServiceUrl,
Expand All @@ -57,7 +61,8 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
String logDirectory,
String extraDependenciesDir,
SecretsProviderConfigurator secretsProviderConfigurator,
boolean authenticationEnabled) {
boolean authenticationEnabled,
Optional<FunctionAuthProvider> functionAuthProvider) {
this.pulsarServiceUrl = pulsarServiceUrl;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.authConfig = authConfig;
Expand Down Expand Up @@ -114,6 +119,8 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
+ " function worker config or system environment");
}
}

authProvider = functionAuthProvider;
}

@Override
Expand All @@ -136,8 +143,8 @@ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String code

// configure auth if necessary
if (authenticationEnabled) {
getAuthProvider().configureAuthenticationConfig(authConfig,
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec()))));
authProvider.ifPresent(functionAuthProvider -> functionAuthProvider.configureAuthenticationConfig(authConfig,
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec())))));
}

return new ProcessRuntime(
Expand All @@ -153,6 +160,11 @@ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String code
expectedHealthCheckInterval);
}

@Override
public Optional<FunctionAuthProvider> getAuthProvider() {
return authProvider;
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.apache.pulsar.functions.runtime;

import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.NoOpFunctionAuthProvider;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;

import java.util.Optional;

/**
* A factory to create {@link Runtime}s to invoke functions.
*/
Expand All @@ -45,8 +46,8 @@ Runtime createContainer(

default void doAdmissionChecks(Function.FunctionDetails functionDetails) { }

default FunctionAuthProvider getAuthProvider() throws IllegalAccessException, InstantiationException {
return NoOpFunctionAuthProvider.class.newInstance();
default Optional<FunctionAuthProvider> getAuthProvider() {
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class KubernetesSecretsTokenAuthProviderTest {
public void testConfigureAuthDataStatefulSet() {

CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");


V1StatefulSet statefulSet = new V1StatefulSet();
Expand All @@ -75,7 +76,8 @@ public void testConfigureAuthDataStatefulSet() {
public void testCacheAuthData() throws ApiException {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.cacheAuthData("test-tenant",
"test-ns", "test-func", new AuthenticationDataSource() {
@Override
Expand All @@ -96,7 +98,8 @@ public String getCommandData() {
@Test
public void configureAuthenticationConfig() {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, Optional.of(functionAuthData));
Expand All @@ -108,8 +111,8 @@ public void configureAuthenticationConfig() {
@Test
public void testUpdateAuthData() throws Exception {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");

KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
// test when existingFunctionAuthData is empty
Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.updateAuthData("test-tenant",
Expand Down
Loading

0 comments on commit 8b7245c

Please sign in to comment.