Skip to content

Commit

Permalink
Implement use argument for LLM for decorators (#1234)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw75 authored May 1, 2023
1 parent dc25d6c commit cc66b07
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 43 deletions.
1 change: 1 addition & 0 deletions sdk/aqueduct/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aqueduct.constants.enums import CheckSeverity, LoadUpdateMode
from aqueduct.decorator import check, metric, op, to_operator
from aqueduct.flow import Flow
from aqueduct.llm_wrapper import llm_op, supported_llms
from aqueduct.schedule import DayOfMonth, DayOfWeek, Hour, Minute, daily, hourly, monthly, weekly


Expand Down
7 changes: 5 additions & 2 deletions sdk/aqueduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
prepare_integration_config,
)
from aqueduct.integrations.databricks_integration import DatabricksIntegration
from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration
from aqueduct.integrations.ecr_integration import ECRIntegration
from aqueduct.integrations.google_sheets_integration import GoogleSheetsIntegration
from aqueduct.integrations.k8s_integration import K8sIntegration
Expand Down Expand Up @@ -450,7 +451,7 @@ def publish_flow(
name: str,
description: str = "",
schedule: str = "",
engine: Optional[str] = None,
engine: Optional[Union[str, DynamicK8sIntegration]] = None,
artifacts: Optional[Union[BaseArtifact, List[BaseArtifact]]] = None,
metrics: Optional[List[NumericArtifact]] = None,
checks: Optional[List[BoolArtifact]] = None,
Expand Down Expand Up @@ -524,7 +525,9 @@ def publish_flow(
"A non-empty string must be supplied for the flow's name."
)

if engine is not None and not isinstance(engine, str):
if engine is not None and not (
isinstance(engine, str) or isinstance(engine, DynamicK8sIntegration)
):
raise InvalidUserArgumentException(
"`engine` parameter must be a string, got %s." % type(engine)
)
Expand Down
1 change: 1 addition & 0 deletions sdk/aqueduct/constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,4 @@ class CustomizableResourceType(str, Enum, metaclass=MetaEnum):
MEMORY = "memory"
GPU_RESOURCE_NAME = "gpu_resource_name"
CUDA_VERSION = "cuda_version"
USE_LLM = "use_llm"
125 changes: 96 additions & 29 deletions sdk/aqueduct/decorator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import uuid
import warnings
import io
import os
import zipfile
from functools import wraps
from typing import Any, Callable, Dict, List, Mapping, Optional, Union, cast

Expand Down Expand Up @@ -36,13 +37,8 @@
get_operator_type,
)
from aqueduct.type_annotations import CheckFunction, MetricFunction, Number, UserFunction
from aqueduct.utils.dag_deltas import (
AddOperatorDelta,
DAGDelta,
RemoveOperatorDelta,
apply_deltas_to_dag,
)
from aqueduct.utils.function_packaging import serialize_function
from aqueduct.utils.dag_deltas import AddOperatorDelta, apply_deltas_to_dag
from aqueduct.utils.function_packaging import REQUIREMENTS_FILE, serialize_function
from aqueduct.utils.naming import default_artifact_name_from_op_name, sanitize_artifact_name
from aqueduct.utils.utils import generate_engine_config, generate_uuid

Expand Down Expand Up @@ -184,14 +180,18 @@ def _typecheck_op_decorator_arguments(
description: Optional[str],
file_dependencies: Optional[List[str]],
requirements: Optional[Union[str, List[str]]],
engine: Optional[str],
engine: Optional[Union[str, DynamicK8sIntegration]],
num_outputs: int,
outputs: Optional[List[str]],
) -> None:
_typecheck_common_decorator_arguments(description, file_dependencies, requirements)

if engine is not None and not isinstance(engine, str):
raise InvalidUserArgumentException("`engine` must be a string.")
if engine is not None and not (
isinstance(engine, str) or isinstance(engine, DynamicK8sIntegration)
):
raise InvalidUserArgumentException(
"`engine` must be a string or a DynamicK8sIntegration object."
)

if num_outputs is not None:
if not isinstance(num_outputs, int) or num_outputs < 1:
Expand Down Expand Up @@ -365,7 +365,7 @@ def _convert_memory_string_to_mbs(memory_str: str) -> int:
if memory_str[-2:].upper() == "MB":
multiplier = 1
elif memory_str[-2:].upper() == "GB":
multiplier = 1000
multiplier = 1024
else:
raise InvalidUserArgumentException(
"Memory value `%s` is invalid. It must have a suffix that is one of mb/MB/gb/GB."
Expand All @@ -384,7 +384,7 @@ def _convert_memory_string_to_mbs(memory_str: str) -> int:

def _update_operator_spec_with_engine(
spec: OperatorSpec,
engine: Optional[str] = None,
engine: Optional[Union[str, DynamicK8sIntegration]] = None,
) -> None:
if engine is not None:
if globals.__GLOBAL_API_CLIENT__ is None:
Expand All @@ -400,8 +400,15 @@ def _update_operator_spec_with_engine(

def _update_operator_spec_with_resources(
spec: OperatorSpec,
llm: bool,
resources: Optional[Dict[str, Any]] = None,
) -> None:
if llm:
if resources is None:
resources = {}

resources[CustomizableResourceType.USE_LLM] = True

if resources is not None:
if not isinstance(resources, Dict) or any(not isinstance(k, str) for k in resources):
raise InvalidUserArgumentException("`resources` must be a dictionary with string keys.")
Expand All @@ -410,6 +417,7 @@ def _update_operator_spec_with_resources(
memory = resources.get(CustomizableResourceType.MEMORY)
gpu_resource_name = resources.get(CustomizableResourceType.GPU_RESOURCE_NAME)
cuda_version = resources.get(CustomizableResourceType.CUDA_VERSION)
use_llm = resources.get(CustomizableResourceType.USE_LLM)

if num_cpus is not None and (not isinstance(num_cpus, int) or num_cpus < 0):
raise InvalidUserArgumentException(
Expand Down Expand Up @@ -451,9 +459,59 @@ def _update_operator_spec_with_resources(
memory_mb=memory,
gpu_resource_name=gpu_resource_name,
cuda_version=cuda_version,
use_llm=use_llm,
)


def _check_llm_requirements(spec: OperatorSpec) -> None:
assert spec.resources is not None

min_required_memory = 8192 # 8GB

if spec.engine_config is not None and spec.engine_config.type is RuntimeType.K8S:
# Check to see if the resources meet the constraints of the LLM.
requested_gpu = False # this is what we default to if gpu_resource_name is not specified.
requested_memory = 4096 # 4GB, this is what we defaults to if memory is not specified.

if spec.resources.gpu_resource_name is not None:
requested_gpu = True
if spec.resources.memory_mb is not None:
requested_memory = spec.resources.memory_mb

if not requested_gpu:
raise InvalidUserArgumentException("LLM requires GPU resource but it is not requested.")

if min_required_memory > requested_memory:
raise InvalidUserArgumentException(
"LLMs require at least %dMB of memory but only %dMB is requested."
% (min_required_memory, requested_memory)
)


def _check_if_requirements_contain_llm(zip_file: bytes) -> bool:
# create a ZipFile instance from the file-like object
with zipfile.ZipFile(io.BytesIO(zip_file), "r") as f:
# check if requirements.txt is in the archive
for filename in f.namelist():
# check if the file name is requirements.txt
if os.path.basename(filename) == REQUIREMENTS_FILE:
with f.open(filename) as requirements_file:
for line in requirements_file:
package_name = line.decode(
"utf-8"
).strip() # decode bytes to str and remove whitespace

# skip lines that are commented out
if package_name.startswith("#"):
continue

# check if aqueduct-llm is one of the requirements
if "aqueduct-llm" in package_name:
return True

return False


def _update_operator_spec_with_image(
spec: OperatorSpec,
image: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -507,7 +565,7 @@ def _update_operator_spec_with_image(
def op(
name: Optional[Union[str, UserFunction]] = None,
description: Optional[str] = None,
engine: Optional[str] = None,
engine: Optional[Union[str, DynamicK8sIntegration]] = None,
file_dependencies: Optional[List[str]] = None,
requirements: Optional[Union[str, List[str]]] = None,
num_outputs: Optional[int] = None,
Expand Down Expand Up @@ -594,9 +652,6 @@ def op(
>>> recommendations.get()
"""
if isinstance(engine, DynamicK8sIntegration):
engine = engine.name()

# Establish parity between `num_outputs` and `outputs`, or raise exception if there is a mismatch.
if num_outputs is None and outputs is None:
num_outputs = 1
Expand Down Expand Up @@ -664,7 +719,13 @@ def _wrapped_util(
)

_update_operator_spec_with_engine(op_spec, engine)
_update_operator_spec_with_resources(op_spec, resources)
_update_operator_spec_with_resources(
op_spec, _check_if_requirements_contain_llm(zip_file), resources
)

if op_spec.resources is not None and op_spec.resources.use_llm:
_check_llm_requirements(op_spec)

_update_operator_spec_with_image(op_spec, image)

assert isinstance(num_outputs, int)
Expand Down Expand Up @@ -708,7 +769,7 @@ def metric(
file_dependencies: Optional[List[str]] = None,
requirements: Optional[Union[str, List[str]]] = None,
output: Optional[str] = None,
engine: Optional[str] = None,
engine: Optional[Union[str, DynamicK8sIntegration]] = None,
resources: Optional[Dict[str, Any]] = None,
image: Optional[Dict[str, str]] = None,
) -> Union[DecoratedMetricFunction, OutputArtifactFunction]:
Expand Down Expand Up @@ -788,9 +849,6 @@ def metric(
>>> churn_metric.get()
"""
if isinstance(engine, DynamicK8sIntegration):
engine = engine.name()

_typecheck_common_decorator_arguments(description, file_dependencies, requirements)

if output is not None and not isinstance(output, str):
Expand Down Expand Up @@ -848,7 +906,13 @@ def _wrapped_util(
metric_spec = MetricSpec(function=function_spec)
op_spec = OperatorSpec(metric=metric_spec)
_update_operator_spec_with_engine(op_spec, engine)
_update_operator_spec_with_resources(op_spec, resources)
_update_operator_spec_with_resources(
op_spec, _check_if_requirements_contain_llm(zip_file), resources
)

if op_spec.resources is not None and op_spec.resources.use_llm:
_check_llm_requirements(op_spec)

_update_operator_spec_with_image(op_spec, image)

output_names = [output] if output is not None else None
Expand Down Expand Up @@ -905,7 +969,7 @@ def check(
file_dependencies: Optional[List[str]] = None,
requirements: Optional[Union[str, List[str]]] = None,
output: Optional[str] = None,
engine: Optional[str] = None,
engine: Optional[Union[str, DynamicK8sIntegration]] = None,
resources: Optional[Dict[str, Any]] = None,
image: Optional[Dict[str, str]] = None,
) -> Union[DecoratedCheckFunction, OutputArtifactFunction]:
Expand Down Expand Up @@ -990,9 +1054,6 @@ def check(
>>> churn_is_low_check.get()
"""
if isinstance(engine, DynamicK8sIntegration):
engine = engine.name()

_typecheck_common_decorator_arguments(description, file_dependencies, requirements)

if output is not None and not isinstance(output, str):
Expand Down Expand Up @@ -1047,7 +1108,13 @@ def _wrapped_util(
check_spec = CheckSpec(level=severity, function=function_spec)
op_spec = OperatorSpec(check=check_spec)
_update_operator_spec_with_engine(op_spec, engine)
_update_operator_spec_with_resources(op_spec, resources)
_update_operator_spec_with_resources(
op_spec, _check_if_requirements_contain_llm(zip_file), resources
)

if op_spec.resources is not None and op_spec.resources.use_llm:
_check_llm_requirements(op_spec)

_update_operator_spec_with_image(op_spec, image)

output_names = [output] if output is not None else None
Expand Down
8 changes: 6 additions & 2 deletions sdk/aqueduct/globals/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from typing import Optional
from typing import Optional, Union

from aqueduct.integrations.dynamic_k8s_integration import DynamicK8sIntegration
from pydantic import BaseModel


class GlobalConfig(BaseModel):
"""Defines the fields that are globally configurable with `aq.global_config()`."""

lazy: bool
engine: Optional[str]
engine: Optional[Union[str, DynamicK8sIntegration]]

class Config:
arbitrary_types_allowed = True


GLOBAL_LAZY_KEY = "lazy"
Expand Down
10 changes: 8 additions & 2 deletions sdk/aqueduct/integrations/dynamic_k8s_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from aqueduct.models.integration import Integration, IntegrationInfo
from aqueduct.models.response_models import DynamicEngineStatusResponse

from aqueduct import globals


def parse_dynamic_k8s_config(
config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig]
Expand Down Expand Up @@ -45,6 +43,8 @@ def __init__(self, metadata: IntegrationInfo):
@validate_is_connected()
def status(self) -> str:
"""Get the current status of the dynamic Kubernetes cluster."""
from aqueduct import globals

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
engine_integration_ids=[str(self._metadata.id)]
)
Expand Down Expand Up @@ -73,6 +73,8 @@ def create(
"""
config_delta = parse_dynamic_k8s_config(config_delta)

from aqueduct import globals

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
engine_integration_ids=[str(self._metadata.id)]
)
Expand Down Expand Up @@ -115,6 +117,8 @@ def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfi
"""
config_delta = parse_dynamic_k8s_config(config_delta)

from aqueduct import globals

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
engine_integration_ids=[str(self._metadata.id)]
)
Expand Down Expand Up @@ -155,6 +159,8 @@ def delete(self, force: bool = False) -> None:
InternalServerError:
An unexpected error occurred within the Aqueduct cluster.
"""
from aqueduct import globals

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
engine_integration_ids=[str(self._metadata.id)]
)
Expand Down
Loading

0 comments on commit cc66b07

Please sign in to comment.