Skip to content

Commit

Permalink
[fix][functions] Handle uploading of builtin Functions to BK if uploa…
Browse files Browse the repository at this point in the history
…dBuiltinSinksSources is true (apache#16111)

* Don't upload builtin Functions to BK if uploadBuiltinSinksSources is false

* Fix builtin Function package location path
  • Loading branch information
cbornet authored Jun 24, 2022
1 parent 35917e6 commit b236899
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 16 deletions.
2 changes: 1 addition & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ functionRuntimeFactoryConfigs:
#### Kubernetes Runtime ####
# Pulsar function are deployed to Kubernetes

# Upload the builtin sources/sinks to BookKeeper.
# Upload the builtin sources/sinks/functions to BookKeeper.
# True by default.
# uploadBuiltinSinksSources: true
#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private Boolean validateConnectorConfig = false;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Should the builtin sources/sinks be uploaded for the externally managed runtimes?"
doc = "Should the builtin sources/sinks/functions be uploaded for the externally managed runtimes?"
)
private Boolean uploadBuiltinSinksSources = true;
@FieldContext(
Expand Down
18 changes: 18 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples-builtin</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -217,6 +225,15 @@
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples-builtin</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-functions-api-examples.nar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand All @@ -230,6 +247,7 @@
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- valid jar file that is not a valid nar file -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,24 +312,32 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
// if the function worker image does not include connectors
if (isBuiltin) {
if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
File sinkOrSource;
if (componentType == FunctionDetails.ComponentType.SOURCE) {
String archiveName = functionDetails.getSource().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
} else {
String archiveName = functionDetails.getSink().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
}
File component;
String archiveName;
switch (componentType) {
case SOURCE:
archiveName = functionDetails.getSource().getBuiltin();
component = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
break;
case SINK:
archiveName = functionDetails.getSink().getBuiltin();
component = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
break;
default:
archiveName = functionDetails.getBuiltin();
component = worker().getFunctionsManager().getFunctionArchive(archiveName).toFile();
break;
}
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
component.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
if (isPackageManagementEnabled) {
packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
worker().getBrokerAdmin().packages().upload(metadata,
packageName.toString(), sinkOrSource.getAbsolutePath());
packageName.toString(), component.getAbsolutePath());
} else {
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
sinkOrSource, worker().getDlogNamespace());
component, worker().getDlogNamespace());
}
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType),
packageLocationMetaDataBuilder.getPackagePath());
Expand Down Expand Up @@ -1611,6 +1619,10 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
}
}

if (!isEmpty(functionDetails.getBuiltin())) {
return functionDetails.getBuiltin();
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,6 +43,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
Expand All @@ -57,6 +59,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
Expand All @@ -72,11 +75,13 @@
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.LeaderService;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
Expand Down Expand Up @@ -145,6 +150,16 @@ public void accept(String s) {
private PulsarFunctionTestTemporaryDirectory tempDirectory;
private static Map<String, MockedStatic> mockStaticContexts = new HashMap<>();

private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
"pulsar-functions-api-examples.nar.path";

public static File getPulsarApiExamplesNar() {
return new File(Objects.requireNonNull(
System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
, "pulsar-functions-api-examples.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
}

@BeforeMethod
public void setup() throws Exception {
this.mockedManager = mock(FunctionMetaDataManager.class);
Expand Down Expand Up @@ -730,6 +745,122 @@ public void testRegisterFunctionInterrupted() throws Exception {
}
}

/*
Externally managed runtime,
uploadBuiltinSinksSources == false
Make sure uploadFileToBookkeeper is not called
*/
@Test
public void testRegisterFunctionSuccessK8sNoUpload() throws Exception {
mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(false);

mockStatic(WorkerUtils.class, ctx -> {
ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class)))
.thenThrow(new RuntimeException("uploadFileToBookkeeper triggered"));

});

NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);

});

doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());

FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);

when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);

when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);

FunctionConfig functionConfig = createDefaultFunctionConfig();
functionConfig.setJar("builtin://exclamation");

try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
resource.registerFunction(
tenant,
namespace,
function,
inputStream,
mockedFormData,
null,
functionConfig,
null, null);
}
}

/*
Externally managed runtime,
uploadBuiltinSinksSources == true
Make sure uploadFileToBookkeeper is called
*/
@Test
public void testRegisterFunctionSuccessK8sWithUpload() throws Exception {
final String injectedErrMsg = "uploadFileToBookkeeper triggered";
mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(true);

mockStatic(WorkerUtils.class, ctx -> {
ctx.when(() -> WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class)))
.thenThrow(new RuntimeException(injectedErrMsg));

});

NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
});

doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());

FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());

when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);

when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);

FunctionConfig functionConfig = createDefaultFunctionConfig();
functionConfig.setJar("builtin://exclamation");

try (FileInputStream inputStream = new FileInputStream(getPulsarApiExamplesNar())) {
try {
resource.registerFunction(
tenant,
namespace,
function,
inputStream,
mockedFormData,
null,
functionConfig,
null, null);
Assert.fail();
} catch (RuntimeException e) {
Assert.assertEquals(e.getMessage(), injectedErrMsg);
}
}
}

//
// Update Functions
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,6 @@ public void testRegisterSinkSuccessK8sNoUpload() throws Exception {
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);

});
Expand Down Expand Up @@ -1655,7 +1654,6 @@ public void testRegisterSinkSuccessK8sWithUpload() throws Exception {
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
});

Expand Down

0 comments on commit b236899

Please sign in to comment.