Skip to content

Commit

Permalink
Fix broken engine_configs due to integration/resource naming inconsis…
Browse files Browse the repository at this point in the history
…tency (#1379)
  • Loading branch information
kenxu95 authored May 26, 2023
1 parent 6efb825 commit cf1d495
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 15 deletions.
9 changes: 6 additions & 3 deletions sdk/aqueduct/backend/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ def get_dynamic_engine_status_by_dag(
dag_engine_config = dag.engine_config
if dag_engine_config.type == RuntimeType.K8S:
assert dag_engine_config.k8s_config is not None
engine_resource_ids.add(str(dag_engine_config.k8s_config.integration_id))
engine_resource_ids.add(str(dag_engine_config.k8s_config.resource_id))
for op in dag.operators.values():
if op.spec.engine_config and op.spec.engine_config.type == RuntimeType.K8S:
assert op.spec.engine_config.k8s_config is not None
engine_resource_ids.add(str(op.spec.engine_config.k8s_config.integration_id))
engine_resource_ids.add(str(op.spec.engine_config.k8s_config.resource_id))

return self.get_dynamic_engine_status(list(engine_resource_ids))

Expand Down Expand Up @@ -519,7 +519,10 @@ def delete_workflow(
url = self.construct_full_url(self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id)
body = {
"external_delete": {
str(resource): [obj.spec.json() for obj in saved_objects_to_delete[resource]]
# TODO(ENG-2994): `by_alias` is required until this naming inconsistency is resolved.
str(resource): [
obj.spec.json(by_alias=True) for obj in saved_objects_to_delete[resource]
]
for resource in saved_objects_to_delete
},
"force": force,
Expand Down
33 changes: 22 additions & 11 deletions sdk/aqueduct/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from aqueduct.resources.databricks import DatabricksResource
from aqueduct.resources.k8s import K8sResource
from aqueduct.resources.spark import SparkResource
from pydantic import BaseModel
from pydantic import BaseModel, Field


class AqueductEngineConfig(BaseModel):
Expand All @@ -18,24 +18,35 @@ class AqueductCondaEngineConfig(BaseModel):
env: str


class AirflowEngineConfig(BaseModel):
resource_id: uuid.UUID
class BaseEngineConfig(BaseModel):
"""Any time this is json-serialized, you must use `by_alias=True` to ensure that
the `integration_id` field is set correctly."""

resource_id: uuid.UUID = Field(alias="integration_id")

class Config:
# Prevents any validation errors due to the alias when setting the `resource_id` field.
allow_population_by_field_name = True


class AirflowEngineConfig(BaseEngineConfig):
pass

class K8sEngineConfig(BaseModel):
integration_id: uuid.UUID

class K8sEngineConfig(BaseEngineConfig):
pass

class LambdaEngineConfig(BaseModel):
resource_id: uuid.UUID

class LambdaEngineConfig(BaseEngineConfig):
pass

class DatabricksEngineConfig(BaseModel):
resource_id: uuid.UUID

class DatabricksEngineConfig(BaseEngineConfig):
pass

class SparkEngineConfig(BaseModel):
resource_id: uuid.UUID

class SparkEngineConfig(BaseEngineConfig):
pass


class EngineConfig(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion sdk/aqueduct/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def generate_engine_config(
type=RuntimeType.K8S,
name=resource_name,
k8s_config=K8sEngineConfig(
integration_id=resource.id,
resource_id=resource.id,
),
)
elif resource.service == ServiceType.LAMBDA:
Expand Down

0 comments on commit cf1d495

Please sign in to comment.