Skip to content

Commit

Permalink
Rename DatasetTaskRef to TaskOutletDatasetReference (apache#25919)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored Aug 29, 2022
1 parent 9c592cb commit b19ccf8
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 31 deletions.
4 changes: 2 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3526,10 +3526,10 @@ components:
producing_tasks:
type: array
items:
$ref: '#/components/schemas/DatasetTaskRef'
$ref: '#/components/schemas/TaskOutletDatasetReference'


DatasetTaskRef:
TaskOutletDatasetReference:
description: |
A datasets reference to an upstream task.
Expand Down
15 changes: 10 additions & 5 deletions airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.api_connexion.schemas.common_schema import JsonObjectField
from airflow.models.dataset import DagScheduleDatasetReference, DatasetEvent, DatasetModel, DatasetTaskRef
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetEvent,
DatasetModel,
TaskOutletDatasetReference,
)


class DatasetTaskRefSchema(SQLAlchemySchema):
"""DatasetTaskRef DB schema"""
class TaskOutletDatasetReferenceSchema(SQLAlchemySchema):
"""TaskOutletDatasetReference DB schema"""

class Meta:
"""Meta"""

model = DatasetTaskRef
model = TaskOutletDatasetReference

dag_id = auto_field()
task_id = auto_field()
Expand Down Expand Up @@ -64,7 +69,7 @@ class Meta:
extra = JsonObjectField()
created_at = auto_field()
updated_at = auto_field()
producing_tasks = fields.List(fields.Nested(DatasetTaskRefSchema))
producing_tasks = fields.List(fields.Nested(TaskOutletDatasetReferenceSchema))
consuming_dags = fields.List(fields.Nested(DagScheduleDatasetReferenceSchema))


Expand Down
10 changes: 5 additions & 5 deletions airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def _create_dag_schedule_dataset_reference_table():
)


def _create_dataset_task_ref_table():
def _create_task_outlet_dataset_reference_table():
op.create_table(
'dataset_task_ref',
'task_outlet_dataset_reference',
sa.Column('dataset_id', Integer, primary_key=True, nullable=False),
sa.Column('dag_id', String(250), primary_key=True, nullable=False),
sa.Column('task_id', String(250), primary_key=True, nullable=False),
Expand All @@ -91,7 +91,7 @@ def _create_dataset_task_ref_table():
sa.ForeignKeyConstraint(
('dataset_id',),
['dataset.id'],
name="datasettaskref_dataset_fkey",
name="todr_dataset_fkey",
ondelete="CASCADE",
),
)
Expand Down Expand Up @@ -162,7 +162,7 @@ def upgrade():
"""Apply Add Dataset model"""
_create_dataset_table()
_create_dag_schedule_dataset_reference_table()
_create_dataset_task_ref_table()
_create_task_outlet_dataset_reference_table()
_create_dataset_dag_run_queue_table()
_create_dataset_event_table()
_create_dataset_event_dag_run_table()
Expand All @@ -171,7 +171,7 @@ def upgrade():
def downgrade():
"""Unapply Add Dataset model"""
op.drop_table('dag_schedule_dataset_reference')
op.drop_table('dataset_task_ref')
op.drop_table('task_outlet_dataset_reference')
op.drop_table('dataset_dag_run_queue')
op.drop_table('dagrun_dataset_event')
op.drop_table('dataset_event')
Expand Down
8 changes: 6 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2659,7 +2659,11 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=NEW_SESSION):
DagCode.bulk_sync_to_db(filelocs, session=session)

from airflow.datasets import Dataset
from airflow.models.dataset import DagScheduleDatasetReference, DatasetModel, DatasetTaskRef
from airflow.models.dataset import (
DagScheduleDatasetReference,
DatasetModel,
TaskOutletDatasetReference,
)

class OutletRef(NamedTuple):
dag_id: str
Expand Down Expand Up @@ -2713,7 +2717,7 @@ class InletRef(NamedTuple):
# store task-outlet-dataset references
for outlet_ref in outlet_references:
session.merge(
DatasetTaskRef(
TaskOutletDatasetReference(
dataset_id=stored_datasets[outlet_ref.uri].id,
dag_id=outlet_ref.dag_id,
task_id=outlet_ref.task_id,
Expand Down
12 changes: 6 additions & 6 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class DatasetModel(Base):
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)

consuming_dags = relationship("DagScheduleDatasetReference", back_populates="dataset")
producing_tasks = relationship("DatasetTaskRef", back_populates="dataset")
producing_tasks = relationship("TaskOutletDatasetReference", back_populates="dataset")

__tablename__ = "dataset"
__table_args__ = (
Expand Down Expand Up @@ -145,8 +145,8 @@ def __repr__(self):
return f"{self.__class__.__name__}({', '.join(args)})"


class DatasetTaskRef(Base):
"""References from a task to a downstream dataset."""
class TaskOutletDatasetReference(Base):
"""References from a task to a dataset that it updates / produces."""

dataset_id = Column(Integer, primary_key=True, nullable=False)
dag_id = Column(String(ID_LEN), primary_key=True, nullable=False)
Expand All @@ -156,15 +156,15 @@ class DatasetTaskRef(Base):

dataset = relationship("DatasetModel")

__tablename__ = "dataset_task_ref"
__tablename__ = "task_outlet_dataset_reference"
__table_args__ = (
ForeignKeyConstraint(
(dataset_id,),
["dataset.id"],
name='datasettaskref_dataset_fkey',
name='todr_dataset_fkey',
ondelete="CASCADE",
),
PrimaryKeyConstraint(dataset_id, dag_id, task_id, name="datasettaskref_pkey", mssql_clustered=True),
PrimaryKeyConstraint(dataset_id, dag_id, task_id, name="todr_pkey", mssql_clustered=True),
)

def __eq__(self, other):
Expand Down
6 changes: 3 additions & 3 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1478,14 +1478,14 @@ export interface components {
/** @description The dataset update time */
updated_at?: string;
consuming_dags?: components["schemas"]["DagScheduleDatasetReference"][];
producing_tasks?: components["schemas"]["DatasetTaskRef"][];
producing_tasks?: components["schemas"]["TaskOutletDatasetReference"][];
};
/**
* @description A datasets reference to an upstream task.
*
* *New in version 2.4.0*
*/
DatasetTaskRef: {
TaskOutletDatasetReference: {
/** @description The DAG ID that updates the dataset. */
dag_id?: string | null;
/** @description The task ID that updates the dataset. */
Expand Down Expand Up @@ -4112,7 +4112,7 @@ export type ActionCollection = CamelCasedPropertiesDeep<components['schemas']['A
export type Resource = CamelCasedPropertiesDeep<components['schemas']['Resource']>;
export type ActionResource = CamelCasedPropertiesDeep<components['schemas']['ActionResource']>;
export type Dataset = CamelCasedPropertiesDeep<components['schemas']['Dataset']>;
export type DatasetTaskRef = CamelCasedPropertiesDeep<components['schemas']['DatasetTaskRef']>;
export type TaskOutletDatasetReference = CamelCasedPropertiesDeep<components['schemas']['TaskOutletDatasetReference']>;
export type DagScheduleDatasetReference = CamelCasedPropertiesDeep<components['schemas']['DagScheduleDatasetReference']>;
export type DatasetCollection = CamelCasedPropertiesDeep<components['schemas']['DatasetCollection']>;
export type DatasetEvent = CamelCasedPropertiesDeep<components['schemas']['DatasetEvent']>;
Expand Down
10 changes: 7 additions & 3 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, TaskInstance as TI
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, get_dataset_triggered_next_run_info
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel, DatasetTaskRef
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel, TaskOutletDatasetReference
from airflow.models.param import DagParam, Param, ParamsDict
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
Expand Down Expand Up @@ -844,8 +844,12 @@ def test_bulk_write_to_db_datasets(self):
assert [x.dag_id for x in d1.consuming_dags] == [dag_id1]
assert [(x.task_id, x.dag_id) for x in d1.producing_tasks] == [(task_id, dag_id2)]
assert set(
session.query(DatasetTaskRef.task_id, DatasetTaskRef.dag_id, DatasetTaskRef.dataset_id)
.filter(DatasetTaskRef.dag_id.in_((dag_id1, dag_id2)))
session.query(
TaskOutletDatasetReference.task_id,
TaskOutletDatasetReference.dag_id,
TaskOutletDatasetReference.dataset_id,
)
.filter(TaskOutletDatasetReference.dag_id.in_((dag_id1, dag_id2)))
.all()
) == {
(task_id, dag_id1, d2.id),
Expand Down
5 changes: 3 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Variable,
XCom,
)
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, DatasetTaskRef
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, TaskOutletDatasetReference
from airflow.models.expandinput import EXPAND_INPUT_EMPTY
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskfail import TaskFail
Expand Down Expand Up @@ -1716,7 +1716,8 @@ def test_outlet_datasets(self, create_task_instance, clear_datasets):

# check that one queue record created for each dag that depends on dataset 1
assert session.query(DatasetDagRunQueue.target_dag_id).filter(
DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'producing_task_1'
TaskOutletDatasetReference.dag_id == dag1.dag_id,
TaskOutletDatasetReference.task_id == 'producing_task_1',
).order_by(DatasetDagRunQueue.target_dag_id).all() == [
('dataset_consumes_1',),
('dataset_consumes_1_and_2',),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
DatasetDagRunQueue,
DatasetEvent,
DatasetModel,
DatasetTaskRef,
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.security.permissions import RESOURCE_DAG_PREFIX
Expand All @@ -66,7 +66,7 @@ def clear_db_datasets():
session.query(DatasetModel).delete()
session.query(DatasetDagRunQueue).delete()
session.query(DagScheduleDatasetReference).delete()
session.query(DatasetTaskRef).delete()
session.query(TaskOutletDatasetReference).delete()


def clear_db_dags():
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def test_no_models_missing(self):
'connection', # leave alone
'slot_pool', # leave alone
'dag_schedule_dataset_reference', # leave alone for now
'dataset_task_ref', # leave alone for now
'task_outlet_dataset_reference', # leave alone for now
'dataset_dag_run_queue', # self-managed
'dataset_event_dag_run', # foreign keys
}
Expand Down

0 comments on commit b19ccf8

Please sign in to comment.