Skip to content

Commit

Permalink
Option to not upload builtin connectors to BK for k8s runtime (apache…
Browse files Browse the repository at this point in the history
…#12947)

### Motivation

When k8s runtime is being used the buitlin connectors get uploaded to BK and the uploaded version used later even after the pulsar upgrade. This means that after the upgrade one needs to delete and re-create the connectors.

### Modifications

Added option to skip the upload to the BK and use the connector package from the connectors directory.

### Verifying this change

Tested locally (minikube, k8s runtime).
I don't see any unit/integration test that cover k8s runtime e2e.
  • Loading branch information
dlg99 authored Dec 15, 2021
1 parent 3e55b4f commit 7576a65
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 15 deletions.
3 changes: 3 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ functionRuntimeFactoryConfigs:
# narExtractionDirectory:
# # The classpath where function instance files stored
# functionInstanceClassPath:
# # Upload the builtin sources/sinks to BookKeeper.
# # True by default.
# uploadBuiltinSinksSources: true
# # the directory for dropping extra function dependencies
# # if it is not an absolute path, it is relative to `pulsarRootDir`
# extraFunctionDependenciesDir:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
category = CATEGORY_FUNCTIONS,
doc = "The path to the location to locate builtin functions"
)
private Boolean uploadBuiltinSinksSources = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Should the builtin sources/sinks be uploaded for the externally managed runtimes?"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ public static FunctionDefinition getFunctionDefinition(String narPath) throws IO
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
}
}

public static Functions searchForFunctions(String functionsDirectory) throws IOException {
return searchForFunctions(functionsDirectory, false);
}

public static Functions searchForFunctions(String functionsDirectory, boolean alwaysPopulatePath) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);

Expand All @@ -96,7 +99,7 @@ public static Functions searchForFunctions(String functionsDirectory) throws IOE
log.info("Found function {} from {}", cntDef, archive);
log.error(cntDef.getName());
log.error(cntDef.getFunctionClass());
if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
functions.functions.put(cntDef.getName(), archive);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;

import java.io.FileInputStream;
import java.nio.file.Files;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
Expand Down Expand Up @@ -78,6 +80,8 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand All @@ -92,6 +96,7 @@
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -282,21 +287,27 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
// For externally managed schedulers, the pkgUrl/builtin stuff should be copied to bk
// For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk
// if the function worker image does not include connectors
if (isBuiltin) {
File sinkOrSource;
if (componentType == FunctionDetails.ComponentType.SOURCE) {
String archiveName = functionDetails.getSource().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
File sinkOrSource;
if (componentType == FunctionDetails.ComponentType.SOURCE) {
String archiveName = functionDetails.getSource().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
} else {
String archiveName = functionDetails.getSink().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
}
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
} else {
String archiveName = functionDetails.getSink().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType));
packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
}
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
} else if (isPkgUrlProvided) {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
uploadedInputStreamAsFile.getName()));
Expand Down Expand Up @@ -1272,6 +1283,20 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
URI url = URI.create(pkgPath);
File file = new File(url.getPath());
Files.copy(file.toPath(), output);
} else if(pkgPath.startsWith(Utils.BUILTIN) && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
Functions sinksOrSources = FunctionUtils.searchForFunctions(connectorsDir, true);
Path narPath = sinksOrSources.getFunctions().get(sType);
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
output.flush();
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api.v3;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
Expand All @@ -29,6 +30,7 @@
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;

Expand All @@ -38,11 +40,13 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;

Expand Down Expand Up @@ -75,6 +79,7 @@
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.LeaderService;
Expand All @@ -99,7 +104,7 @@
/**
* Unit test of {@link FunctionsApiV2Resource}.
*/
@PrepareForTest({WorkerUtils.class, InstanceUtils.class})
@PrepareForTest({WorkerUtils.class, InstanceUtils.class, FunctionUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
public class FunctionApiV3ResourceTest {

Expand Down Expand Up @@ -1542,6 +1547,51 @@ public void testDownloadFunctionFile() throws Exception {
}
}

@Test
public void testDownloadFunctionBuiltin() throws Exception {
mockStatic(WorkerUtils.class);

URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();

WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(config.getUploadBuiltinSinksSources()).thenReturn(false);
when(config.getConnectorsDirectory()).thenReturn("/connectors");

when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);

Map<String, Path> functionsMap = new TreeMap<>();
functionsMap.put("cassandra", file.toPath());
org.apache.pulsar.functions.utils.functions.Functions mockedFunctions =
mock(org.apache.pulsar.functions.utils.functions.Functions.class);
when(mockedFunctions.getFunctions()).thenReturn(functionsMap);

mockStatic(FunctionUtils.class);
PowerMockito.when(FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(mockedFunctions);

StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null);

File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
} else {
fail("expected file");
}
}

@Test
public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.LeaderService;
Expand All @@ -87,6 +89,7 @@
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1600,4 +1603,125 @@ private FunctionDetails createDefaultFunctionDetails() throws IOException {
return SinkConfigUtils.convert(createDefaultSinkConfig(),
new SinkConfigUtils.ExtractedSinkDetails(null, null));
}

/*
Externally managed runtime,
uploadBuiltinSinksSources == false
Make sure uploadFileToBookkeeper is not called
*/
@Test
public void testRegisterSinkSuccessK8sNoUpload() throws Exception {
mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(false);

mockStatic(WorkerUtils.class);
doThrow(new RuntimeException("uploadFileToBookkeeper triggered")).when(WorkerUtils.class);
WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));

mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(any());
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
PowerMockito.when(FunctionCommon.class, "getClassLoaderFromPackage", any(), any(), any(), any())
.thenCallRealMethod();

doReturn(true).when(FunctionCommon.class);
FunctionCommon.isFunctionCodeBuiltin(any());

doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(), any());

NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
ConnectorsManager mockedConnManager = mock(ConnectorsManager.class);
Connector connector = Connector.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedConnManager.getConnector("cassandra")).thenReturn(connector);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mockedConnManager);

when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);

SinkConfig sinkConfig = createDefaultSinkConfig();
sinkConfig.setArchive("builtin://cassandra");

try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
resource.registerSink(
tenant,
namespace,
sink,
inputStream,
mockedFormData,
null,
sinkConfig,
null, null);
}
}

/*
Externally managed runtime,
uploadBuiltinSinksSources == true
Make sure uploadFileToBookkeeper is called
*/
@Test
public void testRegisterSinkSuccessK8sWithUpload() throws Exception {
final String injectedErrMsg = "uploadFileToBookkeeper triggered";
mockedWorkerService.getWorkerConfig().setUploadBuiltinSinksSources(true);

mockStatic(WorkerUtils.class);
doThrow(new RuntimeException(injectedErrMsg)).when(WorkerUtils.class);
WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));

mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(any());
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
PowerMockito.when(FunctionCommon.class, "getClassLoaderFromPackage", any(), any(), any(), any())
.thenCallRealMethod();

doReturn(true).when(FunctionCommon.class);
FunctionCommon.isFunctionCodeBuiltin(any());

doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(), any());

NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
ConnectorsManager mockedConnManager = mock(ConnectorsManager.class);
Connector connector = Connector.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedConnManager.getConnector("cassandra")).thenReturn(connector);
when(mockedConnManager.getSinkArchive(any())).thenReturn(getPulsarIOCassandraNar().toPath());

when(mockedWorkerService.getConnectorsManager()).thenReturn(mockedConnManager);


when(mockedRuntimeFactory.externallyManaged()).thenReturn(true);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);

SinkConfig sinkConfig = createDefaultSinkConfig();
sinkConfig.setArchive("builtin://cassandra");

try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
try {
resource.registerSink(
tenant,
namespace,
sink,
inputStream,
mockedFormData,
null,
sinkConfig,
null, null);
Assert.fail();
} catch (RuntimeException e) {
Assert.assertEquals(e.getMessage(), injectedErrMsg);
}
}
}
}
3 changes: 3 additions & 0 deletions site2/docs/functions-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ functionRuntimeFactoryConfigs:
narExtractionDirectory:
# The classpath where function instance files stored
functionInstanceClassPath:
# Upload the builtin sources/sinks to BookKeeper.
# True by default.
uploadBuiltinSinksSources: true
# the directory for dropping extra function dependencies
# if it is not an absolute path, it is relative to `pulsarRootDir`
extraFunctionDependenciesDir:
Expand Down

0 comments on commit 7576a65

Please sign in to comment.