diff --git a/sdk/aqueduct/backend/api_client.py b/sdk/aqueduct/backend/api_client.py index 77960c4b5..e032e60d7 100644 --- a/sdk/aqueduct/backend/api_client.py +++ b/sdk/aqueduct/backend/api_client.py @@ -309,11 +309,11 @@ def get_dynamic_engine_status_by_dag( dag_engine_config = dag.engine_config if dag_engine_config.type == RuntimeType.K8S: assert dag_engine_config.k8s_config is not None - engine_resource_ids.add(str(dag_engine_config.k8s_config.integration_id)) + engine_resource_ids.add(str(dag_engine_config.k8s_config.resource_id)) for op in dag.operators.values(): if op.spec.engine_config and op.spec.engine_config.type == RuntimeType.K8S: assert op.spec.engine_config.k8s_config is not None - engine_resource_ids.add(str(op.spec.engine_config.k8s_config.integration_id)) + engine_resource_ids.add(str(op.spec.engine_config.k8s_config.resource_id)) return self.get_dynamic_engine_status(list(engine_resource_ids)) @@ -519,7 +519,10 @@ def delete_workflow( url = self.construct_full_url(self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id) body = { "external_delete": { - str(resource): [obj.spec.json() for obj in saved_objects_to_delete[resource]] + # TODO(ENG-2994): `by_alias` is required until this naming inconsistency is resolved. + str(resource): [ + obj.spec.json(by_alias=True) for obj in saved_objects_to_delete[resource] + ] for resource in saved_objects_to_delete }, "force": force, diff --git a/sdk/aqueduct/models/config.py b/sdk/aqueduct/models/config.py index 8245a6489..cf242443a 100644 --- a/sdk/aqueduct/models/config.py +++ b/sdk/aqueduct/models/config.py @@ -7,7 +7,7 @@ from aqueduct.resources.databricks import DatabricksResource from aqueduct.resources.k8s import K8sResource from aqueduct.resources.spark import SparkResource -from pydantic import BaseModel +from pydantic import BaseModel, Field class AqueductEngineConfig(BaseModel): @@ -18,24 +18,35 @@ class AqueductCondaEngineConfig(BaseModel): env: str -class AirflowEngineConfig(BaseModel): - resource_id: uuid.UUID +class BaseEngineConfig(BaseModel): + """Any time this is json-serialized, you must use `by_alias=True` to ensure that + the `integration_id` field is set correctly.""" + resource_id: uuid.UUID = Field(alias="integration_id") + + class Config: + # Prevents any validation errors due to the alias when setting the `resource_id` field. + allow_population_by_field_name = True + + +class AirflowEngineConfig(BaseEngineConfig): + pass -class K8sEngineConfig(BaseModel): - integration_id: uuid.UUID +class K8sEngineConfig(BaseEngineConfig): + pass -class LambdaEngineConfig(BaseModel): - resource_id: uuid.UUID +class LambdaEngineConfig(BaseEngineConfig): + pass -class DatabricksEngineConfig(BaseModel): - resource_id: uuid.UUID +class DatabricksEngineConfig(BaseEngineConfig): + pass -class SparkEngineConfig(BaseModel): - resource_id: uuid.UUID + +class SparkEngineConfig(BaseEngineConfig): + pass class EngineConfig(BaseModel): diff --git a/sdk/aqueduct/utils/utils.py b/sdk/aqueduct/utils/utils.py index a9e1a777d..e5ac1d890 100644 --- a/sdk/aqueduct/utils/utils.py +++ b/sdk/aqueduct/utils/utils.py @@ -179,7 +179,7 @@ def generate_engine_config( type=RuntimeType.K8S, name=resource_name, k8s_config=K8sEngineConfig( - integration_id=resource.id, + resource_id=resource.id, ), ) elif resource.service == ServiceType.LAMBDA: