From cc66b07295382e089d932d02ec1ec74165c635ed Mon Sep 17 00:00:00 2001 From: Chenggang Wu Date: Mon, 1 May 2023 10:50:23 -0700 Subject: [PATCH] Implement `use` argument for LLM for decorators (#1234) --- sdk/aqueduct/__init__.py | 1 + sdk/aqueduct/client.py | 7 +- sdk/aqueduct/constants/enums.py | 1 + sdk/aqueduct/decorator.py | 125 +++++++--- sdk/aqueduct/globals/config.py | 8 +- .../integrations/dynamic_k8s_integration.py | 10 +- sdk/aqueduct/llm_wrapper.py | 220 ++++++++++++++++++ sdk/aqueduct/models/operators.py | 1 + sdk/aqueduct/tests/helpers_test.py | 12 +- sdk/aqueduct/utils/utils.py | 7 +- src/golang/lib/job/constants.go | 8 + src/golang/lib/job/k8s.go | 37 ++- .../lib/models/shared/operator/operator.go | 1 + src/golang/lib/workflow/operator/base.go | 1 + 14 files changed, 396 insertions(+), 43 deletions(-) create mode 100644 sdk/aqueduct/llm_wrapper.py diff --git a/sdk/aqueduct/__init__.py b/sdk/aqueduct/__init__.py index 4e6508221..df8f3212a 100644 --- a/sdk/aqueduct/__init__.py +++ b/sdk/aqueduct/__init__.py @@ -5,6 +5,7 @@ from aqueduct.constants.enums import CheckSeverity, LoadUpdateMode from aqueduct.decorator import check, metric, op, to_operator from aqueduct.flow import Flow +from aqueduct.llm_wrapper import llm_op, supported_llms from aqueduct.schedule import DayOfMonth, DayOfWeek, Hour, Minute, daily, hourly, monthly, weekly diff --git a/sdk/aqueduct/client.py b/sdk/aqueduct/client.py index 4aeb9b48e..a96c57e2d 100644 --- a/sdk/aqueduct/client.py +++ b/sdk/aqueduct/client.py @@ -35,6 +35,7 @@ prepare_integration_config, ) from aqueduct.integrations.databricks_integration import DatabricksIntegration +from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration from aqueduct.integrations.ecr_integration import ECRIntegration from aqueduct.integrations.google_sheets_integration import GoogleSheetsIntegration from aqueduct.integrations.k8s_integration import K8sIntegration @@ -450,7 +451,7 @@ def publish_flow( name: str, description: str = "", schedule: str = "", - engine: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, artifacts: Optional[Union[BaseArtifact, List[BaseArtifact]]] = None, metrics: Optional[List[NumericArtifact]] = None, checks: Optional[List[BoolArtifact]] = None, @@ -524,7 +525,9 @@ def publish_flow( "A non-empty string must be supplied for the flow's name." ) - if engine is not None and not isinstance(engine, str): + if engine is not None and not ( + isinstance(engine, str) or isinstance(engine, DynamicK8sIntegration) + ): raise InvalidUserArgumentException( "`engine` parameter must be a string, got %s." % type(engine) ) diff --git a/sdk/aqueduct/constants/enums.py b/sdk/aqueduct/constants/enums.py index a348c8746..d2b757289 100644 --- a/sdk/aqueduct/constants/enums.py +++ b/sdk/aqueduct/constants/enums.py @@ -263,3 +263,4 @@ class CustomizableResourceType(str, Enum, metaclass=MetaEnum): MEMORY = "memory" GPU_RESOURCE_NAME = "gpu_resource_name" CUDA_VERSION = "cuda_version" + USE_LLM = "use_llm" diff --git a/sdk/aqueduct/decorator.py b/sdk/aqueduct/decorator.py index 0ee6c5261..a4f787593 100644 --- a/sdk/aqueduct/decorator.py +++ b/sdk/aqueduct/decorator.py @@ -1,6 +1,7 @@ import inspect -import uuid -import warnings +import io +import os +import zipfile from functools import wraps from typing import Any, Callable, Dict, List, Mapping, Optional, Union, cast @@ -36,13 +37,8 @@ get_operator_type, ) from aqueduct.type_annotations import CheckFunction, MetricFunction, Number, UserFunction -from aqueduct.utils.dag_deltas import ( - AddOperatorDelta, - DAGDelta, - RemoveOperatorDelta, - apply_deltas_to_dag, -) -from aqueduct.utils.function_packaging import serialize_function +from aqueduct.utils.dag_deltas import AddOperatorDelta, apply_deltas_to_dag +from aqueduct.utils.function_packaging import REQUIREMENTS_FILE, serialize_function from aqueduct.utils.naming import default_artifact_name_from_op_name, sanitize_artifact_name from aqueduct.utils.utils import generate_engine_config, generate_uuid @@ -184,14 +180,18 @@ def _typecheck_op_decorator_arguments( description: Optional[str], file_dependencies: Optional[List[str]], requirements: Optional[Union[str, List[str]]], - engine: Optional[str], + engine: Optional[Union[str, DynamicK8sIntegration]], num_outputs: int, outputs: Optional[List[str]], ) -> None: _typecheck_common_decorator_arguments(description, file_dependencies, requirements) - if engine is not None and not isinstance(engine, str): - raise InvalidUserArgumentException("`engine` must be a string.") + if engine is not None and not ( + isinstance(engine, str) or isinstance(engine, DynamicK8sIntegration) + ): + raise InvalidUserArgumentException( + "`engine` must be a string or a DynamicK8sIntegration object." + ) if num_outputs is not None: if not isinstance(num_outputs, int) or num_outputs < 1: @@ -365,7 +365,7 @@ def _convert_memory_string_to_mbs(memory_str: str) -> int: if memory_str[-2:].upper() == "MB": multiplier = 1 elif memory_str[-2:].upper() == "GB": - multiplier = 1000 + multiplier = 1024 else: raise InvalidUserArgumentException( "Memory value `%s` is invalid. It must have a suffix that is one of mb/MB/gb/GB." @@ -384,7 +384,7 @@ def _convert_memory_string_to_mbs(memory_str: str) -> int: def _update_operator_spec_with_engine( spec: OperatorSpec, - engine: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, ) -> None: if engine is not None: if globals.__GLOBAL_API_CLIENT__ is None: @@ -400,8 +400,15 @@ def _update_operator_spec_with_engine( def _update_operator_spec_with_resources( spec: OperatorSpec, + llm: bool, resources: Optional[Dict[str, Any]] = None, ) -> None: + if llm: + if resources is None: + resources = {} + + resources[CustomizableResourceType.USE_LLM] = True + if resources is not None: if not isinstance(resources, Dict) or any(not isinstance(k, str) for k in resources): raise InvalidUserArgumentException("`resources` must be a dictionary with string keys.") @@ -410,6 +417,7 @@ def _update_operator_spec_with_resources( memory = resources.get(CustomizableResourceType.MEMORY) gpu_resource_name = resources.get(CustomizableResourceType.GPU_RESOURCE_NAME) cuda_version = resources.get(CustomizableResourceType.CUDA_VERSION) + use_llm = resources.get(CustomizableResourceType.USE_LLM) if num_cpus is not None and (not isinstance(num_cpus, int) or num_cpus < 0): raise InvalidUserArgumentException( @@ -451,9 +459,59 @@ def _update_operator_spec_with_resources( memory_mb=memory, gpu_resource_name=gpu_resource_name, cuda_version=cuda_version, + use_llm=use_llm, ) +def _check_llm_requirements(spec: OperatorSpec) -> None: + assert spec.resources is not None + + min_required_memory = 8192 # 8GB + + if spec.engine_config is not None and spec.engine_config.type is RuntimeType.K8S: + # Check to see if the resources meet the constraints of the LLM. + requested_gpu = False # this is what we default to if gpu_resource_name is not specified. + requested_memory = 4096 # 4GB, this is what we defaults to if memory is not specified. + + if spec.resources.gpu_resource_name is not None: + requested_gpu = True + if spec.resources.memory_mb is not None: + requested_memory = spec.resources.memory_mb + + if not requested_gpu: + raise InvalidUserArgumentException("LLM requires GPU resource but it is not requested.") + + if min_required_memory > requested_memory: + raise InvalidUserArgumentException( + "LLMs require at least %dMB of memory but only %dMB is requested." + % (min_required_memory, requested_memory) + ) + + +def _check_if_requirements_contain_llm(zip_file: bytes) -> bool: + # create a ZipFile instance from the file-like object + with zipfile.ZipFile(io.BytesIO(zip_file), "r") as f: + # check if requirements.txt is in the archive + for filename in f.namelist(): + # check if the file name is requirements.txt + if os.path.basename(filename) == REQUIREMENTS_FILE: + with f.open(filename) as requirements_file: + for line in requirements_file: + package_name = line.decode( + "utf-8" + ).strip() # decode bytes to str and remove whitespace + + # skip lines that are commented out + if package_name.startswith("#"): + continue + + # check if aqueduct-llm is one of the requirements + if "aqueduct-llm" in package_name: + return True + + return False + + def _update_operator_spec_with_image( spec: OperatorSpec, image: Optional[Dict[str, str]] = None, @@ -507,7 +565,7 @@ def _update_operator_spec_with_image( def op( name: Optional[Union[str, UserFunction]] = None, description: Optional[str] = None, - engine: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, file_dependencies: Optional[List[str]] = None, requirements: Optional[Union[str, List[str]]] = None, num_outputs: Optional[int] = None, @@ -594,9 +652,6 @@ def op( >>> recommendations.get() """ - if isinstance(engine, DynamicK8sIntegration): - engine = engine.name() - # Establish parity between `num_outputs` and `outputs`, or raise exception if there is a mismatch. if num_outputs is None and outputs is None: num_outputs = 1 @@ -664,7 +719,13 @@ def _wrapped_util( ) _update_operator_spec_with_engine(op_spec, engine) - _update_operator_spec_with_resources(op_spec, resources) + _update_operator_spec_with_resources( + op_spec, _check_if_requirements_contain_llm(zip_file), resources + ) + + if op_spec.resources is not None and op_spec.resources.use_llm: + _check_llm_requirements(op_spec) + _update_operator_spec_with_image(op_spec, image) assert isinstance(num_outputs, int) @@ -708,7 +769,7 @@ def metric( file_dependencies: Optional[List[str]] = None, requirements: Optional[Union[str, List[str]]] = None, output: Optional[str] = None, - engine: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, resources: Optional[Dict[str, Any]] = None, image: Optional[Dict[str, str]] = None, ) -> Union[DecoratedMetricFunction, OutputArtifactFunction]: @@ -788,9 +849,6 @@ def metric( >>> churn_metric.get() """ - if isinstance(engine, DynamicK8sIntegration): - engine = engine.name() - _typecheck_common_decorator_arguments(description, file_dependencies, requirements) if output is not None and not isinstance(output, str): @@ -848,7 +906,13 @@ def _wrapped_util( metric_spec = MetricSpec(function=function_spec) op_spec = OperatorSpec(metric=metric_spec) _update_operator_spec_with_engine(op_spec, engine) - _update_operator_spec_with_resources(op_spec, resources) + _update_operator_spec_with_resources( + op_spec, _check_if_requirements_contain_llm(zip_file), resources + ) + + if op_spec.resources is not None and op_spec.resources.use_llm: + _check_llm_requirements(op_spec) + _update_operator_spec_with_image(op_spec, image) output_names = [output] if output is not None else None @@ -905,7 +969,7 @@ def check( file_dependencies: Optional[List[str]] = None, requirements: Optional[Union[str, List[str]]] = None, output: Optional[str] = None, - engine: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, resources: Optional[Dict[str, Any]] = None, image: Optional[Dict[str, str]] = None, ) -> Union[DecoratedCheckFunction, OutputArtifactFunction]: @@ -990,9 +1054,6 @@ def check( >>> churn_is_low_check.get() """ - if isinstance(engine, DynamicK8sIntegration): - engine = engine.name() - _typecheck_common_decorator_arguments(description, file_dependencies, requirements) if output is not None and not isinstance(output, str): @@ -1047,7 +1108,13 @@ def _wrapped_util( check_spec = CheckSpec(level=severity, function=function_spec) op_spec = OperatorSpec(check=check_spec) _update_operator_spec_with_engine(op_spec, engine) - _update_operator_spec_with_resources(op_spec, resources) + _update_operator_spec_with_resources( + op_spec, _check_if_requirements_contain_llm(zip_file), resources + ) + + if op_spec.resources is not None and op_spec.resources.use_llm: + _check_llm_requirements(op_spec) + _update_operator_spec_with_image(op_spec, image) output_names = [output] if output is not None else None diff --git a/sdk/aqueduct/globals/config.py b/sdk/aqueduct/globals/config.py index 2a1fa545e..98477ba60 100644 --- a/sdk/aqueduct/globals/config.py +++ b/sdk/aqueduct/globals/config.py @@ -1,5 +1,6 @@ -from typing import Optional +from typing import Optional, Union +from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration from pydantic import BaseModel @@ -7,7 +8,10 @@ class GlobalConfig(BaseModel): """Defines the fields that are globally configurable with `aq.global_config()`.""" lazy: bool - engine: Optional[str] + engine: Optional[Union[str, DynamicK8sIntegration]] + + class Config: + arbitrary_types_allowed = True GLOBAL_LAZY_KEY = "lazy" diff --git a/sdk/aqueduct/integrations/dynamic_k8s_integration.py b/sdk/aqueduct/integrations/dynamic_k8s_integration.py index 5536d1c90..2cdd27957 100644 --- a/sdk/aqueduct/integrations/dynamic_k8s_integration.py +++ b/sdk/aqueduct/integrations/dynamic_k8s_integration.py @@ -7,8 +7,6 @@ from aqueduct.models.integration import Integration, IntegrationInfo from aqueduct.models.response_models import DynamicEngineStatusResponse -from aqueduct import globals - def parse_dynamic_k8s_config( config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig] @@ -45,6 +43,8 @@ def __init__(self, metadata: IntegrationInfo): @validate_is_connected() def status(self) -> str: """Get the current status of the dynamic Kubernetes cluster.""" + from aqueduct import globals + engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status( engine_integration_ids=[str(self._metadata.id)] ) @@ -73,6 +73,8 @@ def create( """ config_delta = parse_dynamic_k8s_config(config_delta) + from aqueduct import globals + engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status( engine_integration_ids=[str(self._metadata.id)] ) @@ -115,6 +117,8 @@ def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfi """ config_delta = parse_dynamic_k8s_config(config_delta) + from aqueduct import globals + engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status( engine_integration_ids=[str(self._metadata.id)] ) @@ -155,6 +159,8 @@ def delete(self, force: bool = False) -> None: InternalServerError: An unexpected error occurred within the Aqueduct cluster. """ + from aqueduct import globals + engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status( engine_integration_ids=[str(self._metadata.id)] ) diff --git a/sdk/aqueduct/llm_wrapper.py b/sdk/aqueduct/llm_wrapper.py new file mode 100644 index 000000000..966307546 --- /dev/null +++ b/sdk/aqueduct/llm_wrapper.py @@ -0,0 +1,220 @@ +from typing import Any, Callable, Dict, List, Optional, Union + +import pandas as pd +from aqueduct.artifacts.base_artifact import BaseArtifact +from aqueduct.constants.enums import RuntimeType +from aqueduct.decorator import op +from aqueduct.error import InvalidUserArgumentException +from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration +from aqueduct.utils.utils import generate_engine_config + +from aqueduct import globals + +supported_llms = [ + "llama_7b", + "vicuna_7b", + "dolly_v2_3b", + "dolly_v2_7b", +] + +resource_requests = { + "llama_7b": { + "memory": "16GB", + "gpu_resource_name": "nvidia.com/gpu", + }, + "vicuna_7b": { + "memory": "32GB", + "gpu_resource_name": "nvidia.com/gpu", + }, + "dolly_v2_3b": { + "memory": "8GB", + "gpu_resource_name": "nvidia.com/gpu", + }, + "dolly_v2_7b": { + "memory": "16GB", + "gpu_resource_name": "nvidia.com/gpu", + }, +} + + +def _generate_llm_op( + llm_name: str, column_name: Optional[str] = None, output_column_name: Optional[str] = None +) -> Callable[ + [Union[str, List[str], pd.DataFrame], Dict[str, Any]], Union[str, List[str], pd.DataFrame] +]: + def use_llm( + messages: Union[str, List[str]], parameters: Dict[str, Any] = {} + ) -> Union[str, List[str]]: + if not (isinstance(messages, str) or isinstance(messages, list)): + raise ValueError("The input must be a string or list of strings.") + + module = __import__("aqueduct_llm", fromlist=[llm_name]) + llm = getattr(module, llm_name) + + if "prompt" in parameters: + prompt = parameters["prompt"] + if not isinstance(prompt, str): + raise ValueError("The 'prompt' parameter must be a string.") + + if "{text}" not in prompt: + messages = ( + prompt + " " + messages + if isinstance(messages, str) + else [prompt + " " + m for m in messages] + ) + else: + messages = ( + prompt.replace("{text}", messages) + if isinstance(messages, str) + else [prompt.replace("{text}", m) for m in messages] + ) + + del parameters["prompt"] + + response = llm.generate(messages, **parameters) + assert isinstance(response, str) or isinstance(response, list) + return response + + def use_llm_for_table(df: pd.DataFrame, parameters: Dict[str, Any] = {}) -> pd.DataFrame: + if not isinstance(df, pd.DataFrame): + raise ValueError("The input must be a pandas DataFrame.") + + module = __import__("aqueduct_llm", fromlist=[llm_name]) + llm = getattr(module, llm_name) + + if "prompt" in parameters: + prompt = parameters["prompt"] + if not isinstance(prompt, str): + raise ValueError("The 'prompt' parameter must be a string.") + + if "{text}" not in prompt: + input_series = prompt + " " + df[column_name].astype(str) + else: + input_series = ( + df[column_name].astype(str).apply(lambda x: prompt.replace("{text}", x)) + ) + + del parameters["prompt"] + else: + input_series = df[column_name].astype(str) + + response = llm.generate(input_series.tolist(), **parameters) + assert isinstance(response, list) + + df[output_column_name] = response + return df + + if column_name is None and output_column_name is None: + return use_llm + else: + if column_name is None or output_column_name is None: + raise InvalidUserArgumentException( + "Both column_name and output_column_name must be provided." + ) + if not isinstance(column_name, str) or not isinstance(output_column_name, str): + raise InvalidUserArgumentException( + "column_name and output_column_name must be strings." + ) + + return use_llm_for_table + + +def llm_op( + name: str, + op_name: Optional[str] = None, + column_name: Optional[str] = None, + output_column_name: Optional[str] = None, + engine: Optional[Union[str, DynamicK8sIntegration]] = None, +) -> Union[ + Callable[..., Union[BaseArtifact, List[BaseArtifact]]], BaseArtifact, List[BaseArtifact] +]: + """Generates an Aqueduct operator to run a LLM. Either both column_name and output_column_name must be provided, + or neither must be provided. Please refer to the `Returns` section below for their differences. + + Args: + name: + The name of the LLM to use. Please see aqueduct.supported_llms for a list of supported LLMs. + op_name: + The name of the operator. If not provided, defaults to the name of the LLM. + column_name: + The name of the column of the Dataframe to use as input to the LLM. If this field is provided, + output_column_name must also be provided. + output_column_name: + The name of the column of the Dataframe to store the output of the LLM. If this field is provided, + column_name must also be provided. + engine: + The name of the compute integration this operator will run on. Defaults to the Aqueduct engine. + We recommend using a Kubernetes engine to run LLM operators, as we have implemented performance + optimizations for LLMs on Kubernetes. + + Returns: + If column_name and output_column_name are both provided, returns a function that takes in a + DataFrame and returns a DataFrame with the output of the LLM appended as a new column: + + def llm_for_table(df: pd.DataFrame, parameters: Dict[str, Any] = {}) -> pd.DataFrame: + + Otherwise, returns a function that takes in a string or list of strings, applies LLM, and + returns a string or list of strings: + + def use_llm(messages: Union[str, List[str]], parameters: Dict[str, Any] = {}) -> Union[str, List[str]]: + + In both cases, the function takes in an optional second argument, which is a dictionary of + parameters to pass to the LLM. Please refer to the documentation for the LLM you are using + for a list of supported parameters. For all LLMs, we support the "prompt" parameter. If the + prompt contains {text}, we will replace {text} with the input string(s) before sending to + the LLM. If the prompt does not contain {text}, we will prepend the prompt to the input + string(s) before sending to the LLM. + + Examples: + >>> snowflake = client.integration("snowflake") + >>> reviews_table = snowflake.sql("select * from hotel_reviews;") + + >>> from aqueduct import llm_op + ... vicuna_table_op = llm_op( + ... name="vicuna_7b", + ... op_name="my_vicuna_operator", + ... column_name="review", + ... output_column_name="response", + ... engine=ondemand_k8s, + >>> ) + ... params = client.create_param( + ... "vicuna_params", + ... default={ + ... "prompt": "Respond to the following hotel review as a customer service agent: {text} ", + ... "max_gpu_memory": "13GiB", + ... "temperature": 0.7, + ... "max_new_tokens": 512, + ... } + >>> ) + >>> review_with_response = vicuna_table_op(reviews_table, params) + + `review_with_response` is a Table Artifact with the output of the LLM appended as a new column. + + >>> review_with_response.get() + """ + if name not in supported_llms: + raise InvalidUserArgumentException(f"Unsupported LLM model {name}") + + kwargs: Dict[str, Any] = {} + if engine is not None: + kwargs["engine"] = engine + + engine_config = generate_engine_config( + globals.__GLOBAL_API_CLIENT__.list_integrations(), + engine, + ) + if engine_config and engine_config.type == RuntimeType.K8S: + kwargs["resources"] = resource_requests[name] + + if op_name is None: + op_name = name + + return op( + name=op_name, + requirements=["aqueduct-llm"], + **kwargs, + )( + _generate_llm_op( + llm_name=name, column_name=column_name, output_column_name=output_column_name + ) + ) diff --git a/sdk/aqueduct/models/operators.py b/sdk/aqueduct/models/operators.py index 4b5f6453b..5bdbf55e0 100644 --- a/sdk/aqueduct/models/operators.py +++ b/sdk/aqueduct/models/operators.py @@ -202,6 +202,7 @@ class ResourceConfig(BaseModel): memory_mb: Optional[int] gpu_resource_name: Optional[str] cuda_version: Optional[str] + use_llm: Optional[bool] class ImageConfig(BaseModel): diff --git a/sdk/aqueduct/tests/helpers_test.py b/sdk/aqueduct/tests/helpers_test.py index 37fa747f3..2ee277834 100644 --- a/sdk/aqueduct/tests/helpers_test.py +++ b/sdk/aqueduct/tests/helpers_test.py @@ -20,12 +20,12 @@ def _run_test(input: str, expected: int, err_msg: str = ""): _run_test("150MB", 150) _run_test("150Mb", 150) _run_test("150mb", 150) - _run_test("150GB", 150 * 1000) - _run_test("150Gb", 150 * 1000) - _run_test("150gb", 150 * 1000) - _run_test("150GB ", 150 * 1000) - _run_test("150 GB ", 150 * 1000) - _run_test(" 150 GB ", 150 * 1000) + _run_test("150GB", 150 * 1024) + _run_test("150Gb", 150 * 1024) + _run_test("150gb", 150 * 1024) + _run_test("150GB ", 150 * 1024) + _run_test("150 GB ", 150 * 1024) + _run_test(" 150 GB ", 150 * 1024) _run_test("1", -1, "not long enough") _run_test("150", -1, "must have a suffix that is one of") diff --git a/sdk/aqueduct/utils/utils.py b/sdk/aqueduct/utils/utils.py index 7bd0f71b4..5b3b706c4 100644 --- a/sdk/aqueduct/utils/utils.py +++ b/sdk/aqueduct/utils/utils.py @@ -3,6 +3,7 @@ from aqueduct.constants.enums import ArtifactType, RuntimeType, ServiceType, TriggerType from aqueduct.error import * +from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration from aqueduct.models.config import ( AirflowEngineConfig, DatabricksEngineConfig, @@ -146,12 +147,16 @@ def construct_param_spec( def generate_engine_config( - integrations: Dict[str, IntegrationInfo], integration_name: Optional[str] + integrations: Dict[str, IntegrationInfo], + integration_name: Optional[Union[str, DynamicK8sIntegration]], ) -> Optional[EngineConfig]: """Generates an EngineConfig from an integration info object. Both None and "Aqueduct" (case-insensitive) map to the Aqueduct Engine. """ + if isinstance(integration_name, DynamicK8sIntegration): + integration_name = integration_name.name() + if integration_name is None or integration_name.lower() == "aqueduct": return None diff --git a/src/golang/lib/job/constants.go b/src/golang/lib/job/constants.go index 7d3ddd8a4..8e5f7365a 100644 --- a/src/golang/lib/job/constants.go +++ b/src/golang/lib/job/constants.go @@ -26,4 +26,12 @@ const ( S3ConnectorDockerImage = "aqueducthq/s3-connector" defaultFunctionExtractPath = "/app/function/" + + LlmCuda1141Python38 = "aqueducthq/llm_cuda1141_py38" + LlmCuda1141Python39 = "aqueducthq/llm_cuda1141_py39" + LlmCuda1141Python310 = "aqueducthq/llm_cuda1141_py310" + + LlmCuda1180Python38 = "aqueducthq/llm_cuda1180_py38" + LlmCuda1180Python39 = "aqueducthq/llm_cuda1180_py39" + LlmCuda1180Python310 = "aqueducthq/llm_cuda1180_py310" ) diff --git a/src/golang/lib/job/k8s.go b/src/golang/lib/job/k8s.go index 3ce272171..c445210fc 100644 --- a/src/golang/lib/job/k8s.go +++ b/src/golang/lib/job/k8s.go @@ -312,7 +312,7 @@ func mapJobTypeToDockerImage(spec Spec, launchGpu bool, cudaVersion operator.Cud case FunctionJobType: functionSpec, ok := spec.(*FunctionSpec) if !ok { - return "", errors.New("Unable to determine Python Version.") + return "", errors.New("Unable to cast spec to FunctionSpec.") } pythonVersion, err := function.GetPythonVersion(context.TODO(), functionSpec.FunctionPath, &functionSpec.StorageConfig) @@ -322,6 +322,8 @@ func mapJobTypeToDockerImage(spec Spec, launchGpu bool, cudaVersion operator.Cud if functionSpec.Image != nil { return *functionSpec.Image.Url, nil + } else if functionSpec.Resources.UseLLM != nil && *functionSpec.Resources.UseLLM { + return mapLLMToDockerImage(pythonVersion, cudaVersion) } else if launchGpu { return mapGpuFunctionToDockerImage(pythonVersion, cudaVersion) } else { @@ -411,3 +413,36 @@ func mapGpuFunctionToDockerImage(pythonVersion function.PythonVersion, cudaVersi return "", errors.New("Unsupported CUDA version provided. We currently only support CUDA versions 11.4.1 and 11.8.0") } } + +func mapLLMToDockerImage(pythonVersion function.PythonVersion, cudaVersion operator.CudaVersionNumber) (string, error) { + switch cudaVersion { + case operator.Cuda11_8_0: + switch pythonVersion { + case function.PythonVersion37: + return "", errors.Newf("LLM is not supported for Python version %s.", pythonVersion) + case function.PythonVersion38: + return LlmCuda1180Python38, nil + case function.PythonVersion39: + return LlmCuda1180Python39, nil + case function.PythonVersion310: + return LlmCuda1180Python310, nil + default: + return "", errors.New("Unable to determine Python Version.") + } + case operator.Cuda11_4_1: + switch pythonVersion { + case function.PythonVersion37: + return "", errors.Newf("LLM is not supported for Python version %s.", pythonVersion) + case function.PythonVersion38: + return LlmCuda1141Python38, nil + case function.PythonVersion39: + return LlmCuda1141Python39, nil + case function.PythonVersion310: + return LlmCuda1141Python310, nil + default: + return "", errors.New("Unable to determine Python Version.") + } + default: + return "", errors.New("Unsupported CUDA version provided. We currently only support CUDA versions 11.4.1 and 11.8.0") + } +} diff --git a/src/golang/lib/models/shared/operator/operator.go b/src/golang/lib/models/shared/operator/operator.go index 930c1b516..9f2c71fed 100644 --- a/src/golang/lib/models/shared/operator/operator.go +++ b/src/golang/lib/models/shared/operator/operator.go @@ -47,6 +47,7 @@ type ResourceConfig struct { MemoryMB *int `json:"memory_mb,omitempty"` GPUResourceName *string `json:"gpu_resource_name,omitempty"` CudaVersion *CudaVersionNumber `json:"cuda_version,omitempty"` + UseLLM *bool `json:"use_llm,omitempty"` } type ImageConfig struct { diff --git a/src/golang/lib/workflow/operator/base.go b/src/golang/lib/workflow/operator/base.go index 560f4a6db..8b7832692 100644 --- a/src/golang/lib/workflow/operator/base.go +++ b/src/golang/lib/workflow/operator/base.go @@ -443,6 +443,7 @@ func (bfo *baseFunctionOperator) jobSpec( inputContentPaths, inputMetadataPaths := unzipExecPathsToRawPaths(bfo.inputExecPaths) outputContentPaths, outputMetadataPaths := unzipExecPathsToRawPaths(bfo.outputExecPaths) + return &job.FunctionSpec{ BasePythonSpec: job.NewBasePythonSpec( job.FunctionJobType,