From 3db8b7392b1be22544117ea336dd46cc85c61ba0 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Tue, 23 Oct 2018 17:28:04 -0700 Subject: [PATCH] Allow the ability to specify which artifactory to download dep from (#2824) * Allow the ability to specify which artifactory to download dep from * reverted earlier changes * Made python dependency repository a config of k8runtime * Fixed unittest * Added extra dependency * Fixed cmd lines --- .../src/main/python/python_instance_main.py | 11 ++++++++++- .../pulsar/functions/runtime/KubernetesRuntime.java | 5 ++++- .../functions/runtime/KubernetesRuntimeFactory.java | 8 ++++++++ .../pulsar/functions/runtime/ProcessRuntime.java | 2 +- .../pulsar/functions/runtime/RuntimeUtils.java | 13 ++++++++++++- .../functions/runtime/KubernetesRuntimeTest.java | 8 +++++--- .../functions/worker/FunctionRuntimeManager.java | 2 ++ .../pulsar/functions/worker/WorkerConfig.java | 2 ++ 8 files changed, 44 insertions(+), 7 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 5fc899ad84f1e..1bfc7936e5b3a 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -72,6 +72,9 @@ def main(): parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool) + parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from') + parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from') + args = parser.parse_args() function_details = Function_pb2.FunctionDetails() @@ -84,7 +87,13 @@ def main(): if os.path.splitext(str(args.py))[1] == '.whl': if args.install_usercode_dependencies: - os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py))) + cmd = "pip install -t %s" % os.path.dirname(str(args.py)) + if args.dependency_repository: + cmd = cmd + " -i %s" % str(args.dependency_repository) + if args.extra_dependency_repository: + cmd = cmd + " --extra-index-url %s" % str(args.extra_dependency_repository) + cmd = cmd + " %s" % str(args.py) + os.system(cmd) else: zpfile = zipfile.ZipFile(str(args.py), 'r') zpfile.extractall(os.path.dirname(str(args.py))) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index ddfe36fd805bf..ee7125fbbe7ca 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -105,6 +105,8 @@ class KubernetesRuntime implements Runtime { String jobNamespace, Map customLabels, Boolean installUserCodeDependencies, + String pythonDependencyRepository, + String pythonExtraDependencyRepository, String pulsarDockerImageName, String pulsarRootDir, InstanceConfig instanceConfig, @@ -129,7 +131,8 @@ class KubernetesRuntime implements Runtime { this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName; this.pulsarAdminUrl = pulsarAdminUrl; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, - authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies); + authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies, + pythonDependencyRepository, pythonExtraDependencyRepository); this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval); running = false; doChecks(instanceConfig.getFunctionDetails()); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index b257cbfe084a4..1a180aee0c706 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -46,6 +46,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private final String pulsarRootDir; private final Boolean submittingInsidePod; private final Boolean installUserCodeDependencies; + private final String pythonDependencyRepository; + private final String pythonExtraDependencyRepository; private final Map customLabels; private final String pulsarAdminUri; private final String pulsarServiceUri; @@ -66,6 +68,8 @@ public KubernetesRuntimeFactory(String k8Uri, String pulsarRootDir, Boolean submittingInsidePod, Boolean installUserCodeDependencies, + String pythonDependencyRepository, + String pythonExtraDependencyRepository, Map customLabels, String pulsarServiceUri, String pulsarAdminUri, @@ -90,6 +94,8 @@ public KubernetesRuntimeFactory(String k8Uri, } this.submittingInsidePod = submittingInsidePod; this.installUserCodeDependencies = installUserCodeDependencies; + this.pythonDependencyRepository = pythonDependencyRepository; + this.pythonExtraDependencyRepository = pythonExtraDependencyRepository; this.customLabels = customLabels; this.pulsarServiceUri = pulsarServiceUri; this.pulsarAdminUri = pulsarAdminUri; @@ -128,6 +134,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c jobNamespace, customLabels, installUserCodeDependencies, + pythonDependencyRepository, + pythonExtraDependencyRepository, pulsarDockerImageName, pulsarRootDir, instanceConfig, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index bb777689664e2..1cf0db9fa783b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -75,7 +75,7 @@ class ProcessRuntime implements Runtime { this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl, authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval, - "java_instance_log4j2.yml", false); + "java_instance_log4j2.yml", false, null, null); } /** diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 0572ed6fb81bc..00a04b5455274 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -29,6 +29,7 @@ import java.util.*; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; /** @@ -48,7 +49,9 @@ public static List composeArgs(InstanceConfig instanceConfig, Integer grpcPort, Long expectedHealthCheckInterval, String javaLog4jFileName, - Boolean installUserCodeDepdendencies) throws Exception { + Boolean installUserCodeDepdendencies, + String pythonDependencyRepository, + String pythonExtraDependencyRepository) throws Exception { List args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("java"); @@ -90,6 +93,14 @@ public static List composeArgs(InstanceConfig instanceConfig, args.add("--install_usercode_dependencies"); args.add("True"); } + if (!isEmpty(pythonDependencyRepository)) { + args.add("--dependency_repository"); + args.add(pythonDependencyRepository); + } + if (!isEmpty(pythonExtraDependencyRepository)) { + args.add("--extra_dependency_repository"); + args.add(pythonExtraDependencyRepository); + } // TODO:- Find a platform independent way of controlling memory for a python application } args.add("--instance_id"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index 61caccad07c6e..bc2a8236c90cc 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -72,7 +72,7 @@ public KubernetesRuntimeTest() throws Exception { this.stateStorageServiceUrl = "bk://localhost:4181"; this.logDirectory = "logs/functions"; this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir, - false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null)); + false, true, "myrepo", "anotherrepo", null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null)); doNothing().when(this.factory).setupClient(); } @@ -145,18 +145,20 @@ public void testPythonConstructor() throws Exception { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List args = container.getProcessArgs(); - assertEquals(args.size(), 26); + assertEquals(args.size(), 30); String expectedArgs = "python " + pythonInstanceFile + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory " + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --install_usercode_dependencies True" + + " --dependency_repository myrepo" + + " --extra_dependency_repository anotherrepo" + " --instance_id " + "$SHARD_ID" + " --function_id " + config.getFunctionId() + " --function_version " + config.getFunctionVersion() + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(23) + + " --max_buffered_tuples 1024 --port " + args.get(27) + " --expected_healthcheck_interval -1"; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index ee57659c2c01f..f7dd4bc07e875 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -133,6 +133,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer workerConfig.getKubernetesContainerFactory().getPulsarRootDir(), workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(), workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(), + workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(), + workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(), workerConfig.getKubernetesContainerFactory().getCustomLabels(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index bc979691166bb..2b3e816a797f8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -140,6 +140,8 @@ public static class KubernetesContainerFactory { private String pulsarServiceUrl; private String pulsarAdminUrl; private Boolean installUserCodeDependencies; + private String pythonDependencyRepository; + private String pythonExtraDependencyRepository; private Map customLabels; private Integer expectedMetricsCollectionInterval; }