From adccc2d0dae294e7baddbe249a85e3bb08f61d7d Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 24 Nov 2022 01:03:48 -0800 Subject: [PATCH] Notes stored in separate table (#27849) * wip * try revert rename * simplify * working, minimally * more reverting of notes -> note rename * more reverting of notes -> note rename * more reverting of notes -> note rename * remove scratch code * remove test speedup * restore admin view * add migration * add migration * tod * fix migration * Add DagRunNote * add migration file * disamble notes in search * fix dagrun tests * fix some tests and tighten up relationships, i think * remove notes from create_dagrun method * more cleanup * fix collation * fix db cleanup test * more test fixup * more test fixup * rename to tinote * rename fixup * Don't import FAB user models just to define FK rel We don't (currently) define any relationships it's just for making the FK match the migration, so for now we can have the FK col defined as a string. When we eventually add a relationship to the get the creator of the note, we should move the FAB User model into airflow.models and change Security manager code to import from there instead. * Avoid touching test file unnecessarily * fix import * Apply suggestions from code review * Test that a user_id is set when creating note via api * Fix static checks Co-authored-by: Ash Berlin-Taylor Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: Jed Cunningham --- airflow/api/common/trigger_dag.py | 5 - .../endpoints/dag_run_endpoint.py | 12 +- .../endpoints/task_instance_endpoint.py | 10 +- ...5_0_add_dagrunnote_and_taskinstancenote.py | 94 + ...er_comment_to_task_instance_and_dag_run.py | 63 - airflow/models/dag.py | 3 - airflow/models/dagrun.py | 55 +- airflow/models/taskinstance.py | 62 +- airflow/operators/trigger_dagrun.py | 3 - airflow/www/utils.py | 4 +- airflow/www/views.py | 21 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2571 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 2 +- tests/api/client/test_local_client.py | 4 - .../endpoints/test_dag_run_endpoint.py | 3 +- .../endpoints/test_task_instance_endpoint.py | 51 +- .../schemas/test_dag_run_schema.py | 9 +- tests/models/test_taskinstance.py | 1 - tests/operators/test_trigger_dagrun.py | 12 - tests/utils/test_db_cleanup.py | 3 + tests/www/views/test_views_dagrun.py | 2 +- tests/www/views/test_views_tasks.py | 7 - 23 files changed, 1602 insertions(+), 1397 deletions(-) create mode 100644 airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py delete mode 100644 airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index 686b38ca754c4..01da7745c7bf3 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -35,7 +35,6 @@ def _trigger_dag( conf: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, - notes: str | None = None, ) -> list[DagRun | None]: """Triggers DAG run. @@ -93,7 +92,6 @@ def _trigger_dag( external_trigger=True, dag_hash=dag_bag.dags_hash.get(dag_id), data_interval=data_interval, - notes=notes, ) dag_runs.append(dag_run) @@ -106,7 +104,6 @@ def trigger_dag( conf: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, - notes: str | None = None, ) -> DagRun | None: """Triggers execution of DAG specified by dag_id. @@ -115,7 +112,6 @@ def trigger_dag( :param conf: configuration :param execution_date: date of execution :param replace_microseconds: whether microseconds should be zeroed - :param notes: set a custom note for the newly created DagRun :return: first dag run triggered - even if more than one Dag Runs were triggered or None """ dag_model = DagModel.get_current(dag_id) @@ -130,7 +126,6 @@ def trigger_dag( conf=conf, execution_date=execution_date, replace_microseconds=replace_microseconds, - notes=notes, ) return triggers[0] if triggers else None diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 6678b76903bc8..e14bde177d1fe 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -317,6 +317,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: conf=post_body.get("conf"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), + session=session, ) return dagrun_schema.dump(dag_run) except ValueError as ve: @@ -412,7 +413,7 @@ def clear_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSIO include_parentdag=True, only_failed=False, ) - dag_run.refresh_from_db() + dag_run = session.query(DagRun).filter(DagRun.id == dag_run.id).one() return dagrun_schema.dump(dag_run) @@ -437,6 +438,13 @@ def set_dag_run_notes(*, dag_id: str, dag_run_id: str, session: Session = NEW_SE except ValidationError as err: raise BadRequest(detail=str(err)) - dag_run.notes = new_value_for_notes or None + from flask_login import current_user + + current_user_id = getattr(current_user, "id", None) + if dag_run.dag_run_note is None: + dag_run.notes = (new_value_for_notes, current_user_id) + else: + dag_run.dag_run_note.content = new_value_for_notes + dag_run.dag_run_note.user_id = current_user_id session.commit() return dagrun_schema.dump(dag_run) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 74f8d2a015d14..3f692d8a0d754 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -670,7 +670,13 @@ def set_task_instance_notes( raise NotFound(error_message) ti, sla_miss = result - ti.notes = new_value_for_notes or None - session.commit() + from flask_login import current_user + current_user_id = getattr(current_user, "id", None) + if ti.task_instance_note is None: + ti.notes = (new_value_for_notes, current_user_id) + else: + ti.task_instance_note.content = new_value_for_notes + ti.task_instance_note.user_id = current_user_id + session.commit() return task_instance_schema.dump((ti, sla_miss)) diff --git a/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py b/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py new file mode 100644 index 0000000000000..d13fba38aa70c --- /dev/null +++ b/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add DagRunNote and TaskInstanceNote + +Revision ID: 1986afd32c1b +Revises: ee8d93fcc81e +Create Date: 2022-11-22 21:49:05.843439 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import StringID +from airflow.utils.sqlalchemy import UtcDateTime + +# revision identifiers, used by Alembic. +revision = "1986afd32c1b" +down_revision = "ee8d93fcc81e" +branch_labels = None +depends_on = None +airflow_version = "2.5.0" + + +def upgrade(): + """Apply Add DagRunNote and TaskInstanceNote""" + op.create_table( + "dag_run_note", + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("dag_run_id", sa.Integer(), nullable=False), + sa.Column( + "content", sa.String(length=1000).with_variant(sa.Text(length=1000), "mysql"), nullable=True + ), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint( + ("dag_run_id",), ["dag_run.id"], name="dag_run_note_dr_fkey", ondelete="CASCADE" + ), + sa.ForeignKeyConstraint(("user_id",), ["ab_user.id"], name="dag_run_note_user_fkey"), + sa.PrimaryKeyConstraint("dag_run_id", name=op.f("dag_run_note_pkey")), + ) + + op.create_table( + "task_instance_note", + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("task_id", StringID(), nullable=False), + sa.Column("dag_id", StringID(), nullable=False), + sa.Column("run_id", StringID(), nullable=False), + sa.Column("map_index", sa.Integer(), nullable=False), + sa.Column( + "content", sa.String(length=1000).with_variant(sa.Text(length=1000), "mysql"), nullable=True + ), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint( + "task_id", "dag_id", "run_id", "map_index", name=op.f("task_instance_note_pkey") + ), + sa.ForeignKeyConstraint( + ("dag_id", "task_id", "run_id", "map_index"), + [ + "task_instance.dag_id", + "task_instance.task_id", + "task_instance.run_id", + "task_instance.map_index", + ], + name="task_instance_note_ti_fkey", + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint(("user_id",), ["ab_user.id"], name="task_instance_note_user_fkey"), + ) + + +def downgrade(): + """Unapply Add DagRunNote and TaskInstanceNote""" + op.drop_table("task_instance_note") + op.drop_table("dag_run_note") diff --git a/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py b/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py deleted file mode 100644 index f09eee136f203..0000000000000 --- a/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Add user comment to task_instance and dag_run. - -Revision ID: 65a852f26899 -Revises: ecb43d2a1842 -Create Date: 2022-09-17 20:01:42.652862 - -""" - -from __future__ import annotations - -import sqlalchemy as sa -from alembic import op - -# revision identifiers, used by Alembic. -revision = "65a852f26899" -down_revision = "ee8d93fcc81e" -branch_labels = None -depends_on = None -airflow_version = "2.5.0" - - -def upgrade(): - """Apply add user comment to task_instance and dag_run""" - conn = op.get_bind() - - with op.batch_alter_table("dag_run") as batch_op: - if conn.dialect.name == "mysql": - batch_op.add_column(sa.Column("notes", sa.Text(length=1000), nullable=True)) - else: - batch_op.add_column(sa.Column("notes", sa.String(length=1000), nullable=True)) - - with op.batch_alter_table("task_instance") as batch_op: - if conn.dialect.name == "mysql": - batch_op.add_column(sa.Column("notes", sa.Text(length=1000), nullable=True)) - else: - batch_op.add_column(sa.Column("notes", sa.String(length=1000), nullable=True)) - - -def downgrade(): - """Unapply add user comment to task_instance and dag_run""" - with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.drop_column("notes") - - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_column("notes") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b61f679269395..bd81980fc2b59 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2552,7 +2552,6 @@ def create_dagrun( dag_hash: str | None = None, creating_job_id: int | None = None, data_interval: tuple[datetime, datetime] | None = None, - notes: str | None = None, ): """ Creates a dag run from this dag including the tasks associated with this dag. @@ -2569,7 +2568,6 @@ def create_dagrun( :param session: database session :param dag_hash: Hash of Serialized DAG :param data_interval: Data interval of the DagRun - :param notes: A custom note for the DAGRun. """ logical_date = timezone.coerce_datetime(execution_date) @@ -2628,7 +2626,6 @@ def create_dagrun( dag_hash=dag_hash, creating_job_id=creating_job_id, data_interval=data_interval, - notes=notes, ) session.add(run) session.flush() diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2f2d9509a48d7..e148d08f498b7 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -28,6 +28,7 @@ Boolean, Column, ForeignKey, + ForeignKeyConstraint, Index, Integer, PickleType, @@ -40,6 +41,7 @@ text, ) from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import joinedload, relationship, synonym from sqlalchemy.orm.session import Session @@ -85,6 +87,16 @@ class TISchedulingDecision(NamedTuple): finished_tis: list[TI] +def _creator_note(val): + """Custom creator for the ``note`` association proxy.""" + if isinstance(val, str): + return DagRunNote(content=val) + elif isinstance(val, dict): + return DagRunNote(**val) + else: + return DagRunNote(*val) + + class DagRun(Base, LoggingMixin): """ DagRun describes an instance of a Dag. It can be created @@ -111,7 +123,6 @@ class DagRun(Base, LoggingMixin): # When a scheduler last attempted to schedule TIs for this DagRun last_scheduling_decision = Column(UtcDateTime) dag_hash = Column(String(32)) - notes = Column(String(1000).with_variant(Text(1000), "mysql")) # Foreign key to LogTemplate. DagRun rows created prior to this column's # existence have this set to NULL. Later rows automatically populate this on # insert to point to the latest LogTemplate entry. @@ -163,6 +174,8 @@ class DagRun(Base, LoggingMixin): uselist=False, viewonly=True, ) + dag_run_note = relationship("DagRunNote", back_populates="dag_run", uselist=False) + notes = association_proxy("dag_run_note", "content", creator=_creator_note) DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint( "scheduler", @@ -184,7 +197,6 @@ def __init__( dag_hash: str | None = None, creating_job_id: int | None = None, data_interval: tuple[datetime, datetime] | None = None, - notes: str | None = None, ): if data_interval is None: # Legacy: Only happen for runs created prior to Airflow 2.2. @@ -207,7 +219,6 @@ def __init__( self.run_type = run_type self.dag_hash = dag_hash self.creating_job_id = creating_job_id - self.notes = notes super().__init__() def __repr__(self): @@ -1295,3 +1306,41 @@ def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: stacklevel=2, ) return self.get_log_template(session=session).filename + + +class DagRunNote(Base): + """For storage of arbitrary notes concerning the dagrun instance.""" + + __tablename__ = "dag_run_note" + + user_id = Column(Integer, nullable=True) + dag_run_id = Column(Integer, primary_key=True, nullable=False) + content = Column(String(1000).with_variant(Text(1000), "mysql")) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) + + dag_run = relationship("DagRun", back_populates="dag_run_note") + + __table_args__ = ( + ForeignKeyConstraint( + (dag_run_id,), + ["dag_run.id"], + name="dag_run_note_dr_fkey", + ondelete="CASCADE", + ), + ForeignKeyConstraint( + (user_id,), + ["ab_user.id"], + name="dag_run_note_user_fkey", + ), + ) + + def __init__(self, content, user_id=None): + self.content = content + self.user_id = user_id + + def __repr__(self): + prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.dagrun_id} {self.run_id}" + if self.map_index != -1: + prefix += f" map_index={self.map_index}" + return prefix + ">" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 29d0ad4497405..58f4d5c379c10 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -313,6 +313,16 @@ def key(self) -> TaskInstanceKey: return self +def _creator_note(val): + """Custom creator for the ``note`` association proxy.""" + if isinstance(val, str): + return TaskInstanceNote(content=val) + elif isinstance(val, dict): + return TaskInstanceNote(**val) + else: + return TaskInstanceNote(*val) + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. This table is the @@ -355,7 +365,6 @@ class TaskInstance(Base, LoggingMixin): queued_by_job_id = Column(Integer) pid = Column(Integer) executor_config = Column(ExecutorConfigType(pickler=dill)) - notes = Column(String(1000).with_variant(Text(1000), "mysql")) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow) external_executor_id = Column(StringID()) @@ -415,9 +424,9 @@ class TaskInstance(Base, LoggingMixin): triggerer_job = association_proxy("trigger", "triggerer_job") dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False) - execution_date = association_proxy("dag_run", "execution_date") - + task_instance_note = relationship("TaskInstanceNote", back_populates="task_instance", uselist=False) + notes = association_proxy("task_instance_note", "content", creator=_creator_note) task: Operator # Not always set... def __init__( @@ -794,7 +803,6 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool self.trigger_id = ti.trigger_id self.next_method = ti.next_method self.next_kwargs = ti.next_kwargs - self.notes = ti.notes else: self.state = None @@ -2676,6 +2684,52 @@ def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance: return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key) +class TaskInstanceNote(Base): + """For storage of arbitrary notes concerning the task instance.""" + + __tablename__ = "task_instance_note" + + user_id = Column(Integer, nullable=True) + task_id = Column(StringID(), primary_key=True, nullable=False) + dag_id = Column(StringID(), primary_key=True, nullable=False) + run_id = Column(StringID(), primary_key=True, nullable=False) + map_index = Column(Integer, primary_key=True, nullable=False) + content = Column(String(1000).with_variant(Text(1000), "mysql")) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) + + task_instance = relationship("TaskInstance", back_populates="task_instance_note") + + __table_args__ = ( + ForeignKeyConstraint( + (dag_id, task_id, run_id, map_index), + [ + "task_instance.dag_id", + "task_instance.task_id", + "task_instance.run_id", + "task_instance.map_index", + ], + name="task_instance_note_ti_fkey", + ondelete="CASCADE", + ), + ForeignKeyConstraint( + (user_id,), + ["ab_user.id"], + name="task_instance_note_user_fkey", + ), + ) + + def __init__(self, content, user_id=None): + self.content = content + self.user_id = user_id + + def __repr__(self): + prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}" + if self.map_index != -1: + prefix += f" map_index={self.map_index}" + return prefix + ">" + + STATICA_HACK = True globals()["kcah_acitats"[::-1].upper()] = False if STATICA_HACK: # pragma: no cover diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index e0189038f3c12..b687c05dc4b97 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -99,7 +99,6 @@ def __init__( poke_interval: int = 60, allowed_states: list | None = None, failed_states: list | None = None, - dag_run_notes: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -111,7 +110,6 @@ def __init__( self.poke_interval = poke_interval self.allowed_states = allowed_states or [State.SUCCESS] self.failed_states = failed_states or [State.FAILED] - self.dag_run_notes = dag_run_notes if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)): raise TypeError( @@ -144,7 +142,6 @@ def execute(self, context: Context): conf=self.conf, execution_date=parsed_execution_date, replace_microseconds=False, - notes=self.dag_run_notes, ) except DagRunAlreadyExists as e: diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 8cbaa504f541b..c1908ed87c99e 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -717,10 +717,10 @@ def clean_column_names(): clean_column_names() # Support for AssociationProxy in search and list columns - for desc in self.obj.__mapper__.all_orm_descriptors: + for obj_attr, desc in self.obj.__mapper__.all_orm_descriptors.items(): if not isinstance(desc, AssociationProxy): continue - proxy_instance = getattr(self.obj, desc.value_attr) + proxy_instance = getattr(self.obj, obj_attr) if hasattr(proxy_instance.remote_attr.prop, "columns"): self.list_columns[desc.value_attr] = proxy_instance.remote_attr.prop.columns[0] self.list_properties[desc.value_attr] = proxy_instance.remote_attr.prop diff --git a/airflow/www/views.py b/airflow/www/views.py index 61ccc47db0e45..34bbe6d150ff3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -101,7 +101,7 @@ from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel from airflow.models.operator import Operator from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstance import TaskInstance, TaskInstanceNote from airflow.providers_manager import ProvidersManager from airflow.security import permissions from airflow.ti_deps.dep_context import DepContext @@ -263,19 +263,18 @@ def dag_to_grid(dag, dag_runs, session): TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, - TaskInstance.notes, - sqla.func.count(sqla.func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label( - "state_count" - ), - sqla.func.min(TaskInstance.start_date).label("start_date"), - sqla.func.max(TaskInstance.end_date).label("end_date"), - sqla.func.max(TaskInstance._try_number).label("_try_number"), + func.min(TaskInstanceNote.content).label("notes"), + func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"), + func.min(TaskInstance.start_date).label("start_date"), + func.max(TaskInstance.end_date).label("end_date"), + func.max(TaskInstance._try_number).label("_try_number"), ) + .join(TaskInstance.task_instance_note, isouter=True) .filter( TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), ) - .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.notes) + .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state) .order_by(TaskInstance.task_id, TaskInstance.run_id) ) @@ -4899,7 +4898,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView): "run_type", "start_date", "end_date", - "notes", + # "notes", # todo: maybe figure out how to re-enable this "external_trigger", ] label_columns = { @@ -5291,7 +5290,7 @@ class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): "operator", "start_date", "end_date", - "notes", + # "notes", # todo: maybe make notes work with TI search? "hostname", "priority_weight", "queue", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8e2b53ba86452..d2240af1eb048 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -bf1db1b1041afe3ef6277d96701f6d82bce44497b2b4c49ee79f7bb198f51042 \ No newline at end of file +f529521071a6c9ae8bbd58d63cf1195fc1ec964308e7684569d0a36d26534def \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index ec7546b1aa078..a348a579c4466 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,1511 +4,1614 @@ - - + + %3 - + ab_permission - -ab_permission - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - [INTEGER] - NOT NULL - -permission_id - [INTEGER] - -view_menu_id - [INTEGER] + +ab_permission_view + +id + [INTEGER] + NOT NULL + +permission_id + [INTEGER] + +view_menu_id + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - [INTEGER] - NOT NULL - -permission_view_id - [INTEGER] - -role_id - [INTEGER] + +ab_permission_view_role + +id + [INTEGER] + NOT NULL + +permission_view_id + [INTEGER] + +role_id + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(64)] - NOT NULL + +ab_role + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(64)] + NOT NULL ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - [INTEGER] - NOT NULL - -role_id - [INTEGER] - -user_id - [INTEGER] + +ab_user_role + +id + [INTEGER] + NOT NULL + +role_id + [INTEGER] + +user_id + [INTEGER] ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - [INTEGER] - NOT NULL - -email - [VARCHAR(256)] - NOT NULL - -first_name - [VARCHAR(64)] - NOT NULL - -last_name - [VARCHAR(64)] - NOT NULL - -password - [VARCHAR(256)] - -registration_date - [DATETIME] - -registration_hash - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_register_user + +id + [INTEGER] + NOT NULL + +email + [VARCHAR(256)] + NOT NULL + +first_name + [VARCHAR(64)] + NOT NULL + +last_name + [VARCHAR(64)] + NOT NULL + +password + [VARCHAR(256)] + +registration_date + [DATETIME] + +registration_hash + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user - -ab_user - -id - [INTEGER] - NOT NULL - -active - [BOOLEAN] - -changed_by_fk - [INTEGER] - -changed_on - [DATETIME] - -created_by_fk - [INTEGER] - -created_on - [DATETIME] - -email - [VARCHAR(256)] - NOT NULL - -fail_login_count - [INTEGER] - -first_name - [VARCHAR(64)] - NOT NULL - -last_login - [DATETIME] - -last_name - [VARCHAR(64)] - NOT NULL - -login_count - [INTEGER] - -password - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_user + +id + [INTEGER] + NOT NULL + +active + [BOOLEAN] + +changed_by_fk + [INTEGER] + +changed_on + [DATETIME] + +created_by_fk + [INTEGER] + +created_on + [DATETIME] + +email + [VARCHAR(256)] + NOT NULL + +fail_login_count + [INTEGER] + +first_name + [VARCHAR(64)] + NOT NULL + +last_login + [DATETIME] + +last_name + [VARCHAR(64)] + NOT NULL + +login_count + [INTEGER] + +password + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + +dag_run_note + +dag_run_note + +dag_run_id + [INTEGER] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] + + + +ab_user--dag_run_note + +0..N +{0,1} + + + +task_instance_note + +task_instance_note + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] + + + +ab_user--task_instance_note + +0..N +{0,1} + + + alembic_version - -alembic_version - -version_num - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + [VARCHAR(32)] + NOT NULL - + callback_request - -callback_request - -id - [INTEGER] - NOT NULL - -callback_data - [JSON] - NOT NULL - -callback_type - [VARCHAR(20)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -priority_weight - [INTEGER] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +callback_request + +id + [INTEGER] + NOT NULL + +callback_data + [JSON] + NOT NULL + +callback_type + [VARCHAR(20)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +priority_weight + [INTEGER] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + connection - -connection - -id - [INTEGER] - NOT NULL - -conn_id - [VARCHAR(250)] - NOT NULL - -conn_type - [VARCHAR(500)] - NOT NULL - -description - [VARCHAR(5000)] - -extra - [TEXT] - -host - [VARCHAR(500)] - -is_encrypted - [BOOLEAN] - -is_extra_encrypted - [BOOLEAN] - -login - [VARCHAR(500)] - -password - [VARCHAR(5000)] - -port - [INTEGER] - -schema - [VARCHAR(500)] + +connection + +id + [INTEGER] + NOT NULL + +conn_id + [VARCHAR(250)] + NOT NULL + +conn_type + [VARCHAR(500)] + NOT NULL + +description + [VARCHAR(5000)] + +extra + [TEXT] + +host + [VARCHAR(500)] + +is_encrypted + [BOOLEAN] + +is_extra_encrypted + [BOOLEAN] + +login + [VARCHAR(500)] + +password + [VARCHAR(5000)] + +port + [INTEGER] + +schema + [VARCHAR(500)] - + dag - -dag - -dag_id - [VARCHAR(250)] - NOT NULL - -default_view - [VARCHAR(25)] - -description - [TEXT] - -fileloc - [VARCHAR(2000)] - -has_import_errors - [BOOLEAN] - -has_task_concurrency_limits - [BOOLEAN] - NOT NULL - -is_active - [BOOLEAN] - -is_paused - [BOOLEAN] - -is_subdag - [BOOLEAN] - -last_expired - [TIMESTAMP] - -last_parsed_time - [TIMESTAMP] - -last_pickled - [TIMESTAMP] - -max_active_runs - [INTEGER] - -max_active_tasks - [INTEGER] - NOT NULL - -next_dagrun - [TIMESTAMP] - -next_dagrun_create_after - [TIMESTAMP] - -next_dagrun_data_interval_end - [TIMESTAMP] - -next_dagrun_data_interval_start - [TIMESTAMP] - -owners - [VARCHAR(2000)] - -pickle_id - [INTEGER] - -processor_subdir - [VARCHAR(2000)] - -root_dag_id - [VARCHAR(250)] - -schedule_interval - [TEXT] - -scheduler_lock - [BOOLEAN] - -timetable_description - [VARCHAR(1000)] + +dag + +dag_id + [VARCHAR(250)] + NOT NULL + +default_view + [VARCHAR(25)] + +description + [TEXT] + +fileloc + [VARCHAR(2000)] + +has_import_errors + [BOOLEAN] + +has_task_concurrency_limits + [BOOLEAN] + NOT NULL + +is_active + [BOOLEAN] + +is_paused + [BOOLEAN] + +is_subdag + [BOOLEAN] + +last_expired + [TIMESTAMP] + +last_parsed_time + [TIMESTAMP] + +last_pickled + [TIMESTAMP] + +max_active_runs + [INTEGER] + +max_active_tasks + [INTEGER] + NOT NULL + +next_dagrun + [TIMESTAMP] + +next_dagrun_create_after + [TIMESTAMP] + +next_dagrun_data_interval_end + [TIMESTAMP] + +next_dagrun_data_interval_start + [TIMESTAMP] + +owners + [VARCHAR(2000)] + +pickle_id + [INTEGER] + +processor_subdir + [VARCHAR(2000)] + +root_dag_id + [VARCHAR(250)] + +schedule_interval + [TEXT] + +scheduler_lock + [BOOLEAN] + +timetable_description + [VARCHAR(1000)] - + dag_owner_attributes - -dag_owner_attributes - -dag_id - [VARCHAR(250)] - NOT NULL - -owner - [VARCHAR(500)] - NOT NULL - -link - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + [VARCHAR(250)] + NOT NULL + +owner + [VARCHAR(500)] + NOT NULL + +link + [VARCHAR(500)] + NOT NULL - + dag--dag_owner_attributes - -0..N -{0,1} + +0..N +{0,1} - + dag_schedule_dataset_reference - -dag_schedule_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +dag_schedule_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL - + dag--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dag_tag - -dag_tag - -dag_id - [VARCHAR(250)] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + [VARCHAR(250)] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL - + dag--dag_tag - -0..N -{0,1} + +0..N +{0,1} - + dag_warning - -dag_warning - -dag_id - [VARCHAR(250)] - NOT NULL - -warning_type - [VARCHAR(50)] - NOT NULL - -message - [TEXT] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + [VARCHAR(250)] + NOT NULL + +warning_type + [VARCHAR(50)] + NOT NULL + +message + [TEXT] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL - + dag--dag_warning - -0..N -{0,1} + +0..N +{0,1} - + dataset_dag_run_queue - -dataset_dag_run_queue - -dataset_id - [INTEGER] - NOT NULL - -target_dag_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL + +dataset_dag_run_queue + +dataset_id + [INTEGER] + NOT NULL + +target_dag_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL - + dag--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} - + task_outlet_dataset_reference - -task_outlet_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +task_outlet_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL - + dag--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dag_code - -dag_code - -fileloc_hash - [BIGINT] - NOT NULL - -fileloc - [VARCHAR(2000)] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -source_code - [TEXT] - NOT NULL + +dag_code + +fileloc_hash + [BIGINT] + NOT NULL + +fileloc + [VARCHAR(2000)] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +source_code + [TEXT] + NOT NULL - + dag_pickle - -dag_pickle - -id - [INTEGER] - NOT NULL - -created_dttm - [TIMESTAMP] - -pickle - [BLOB] - -pickle_hash - [BIGINT] + +dag_pickle + +id + [INTEGER] + NOT NULL + +created_dttm + [TIMESTAMP] + +pickle + [BLOB] + +pickle_hash + [BIGINT] - + dag_run - -dag_run - -id - [INTEGER] - NOT NULL - -conf - [BLOB] - -creating_job_id - [INTEGER] - -dag_hash - [VARCHAR(32)] - -dag_id - [VARCHAR(250)] - NOT NULL - -data_interval_end - [TIMESTAMP] - -data_interval_start - [TIMESTAMP] - -end_date - [TIMESTAMP] - -execution_date - [TIMESTAMP] - NOT NULL - -external_trigger - [BOOLEAN] - -last_scheduling_decision - [TIMESTAMP] - -log_template_id - [INTEGER] - -notes - [VARCHAR(1000)] - -queued_at - [TIMESTAMP] - -run_id - [VARCHAR(250)] - NOT NULL - -run_type - [VARCHAR(50)] - NOT NULL - -start_date - [TIMESTAMP] - -state - [VARCHAR(50)] - -updated_at - [TIMESTAMP] + +dag_run + +id + [INTEGER] + NOT NULL + +conf + [BLOB] + +creating_job_id + [INTEGER] + +dag_hash + [VARCHAR(32)] + +dag_id + [VARCHAR(250)] + NOT NULL + +data_interval_end + [TIMESTAMP] + +data_interval_start + [TIMESTAMP] + +end_date + [TIMESTAMP] + +execution_date + [TIMESTAMP] + NOT NULL + +external_trigger + [BOOLEAN] + +last_scheduling_decision + [TIMESTAMP] + +log_template_id + [INTEGER] + +queued_at + [TIMESTAMP] + +run_id + [VARCHAR(250)] + NOT NULL + +run_type + [VARCHAR(50)] + NOT NULL + +start_date + [TIMESTAMP] + +state + [VARCHAR(50)] + +updated_at + [TIMESTAMP] + + + +dag_run--dag_run_note + +0..N +{0,1} - + dagrun_dataset_event - -dagrun_dataset_event - -dag_run_id - [INTEGER] - NOT NULL - -event_id - [INTEGER] - NOT NULL + +dagrun_dataset_event + +dag_run_id + [INTEGER] + NOT NULL + +event_id + [INTEGER] + NOT NULL - + dag_run--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} - + task_instance - -task_instance - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -duration - [FLOAT] - -end_date - [TIMESTAMP] - -executor_config - [BLOB] - -external_executor_id - [VARCHAR(250)] - -hostname - [VARCHAR(1000)] - -job_id - [INTEGER] - -max_tries - [INTEGER] - -next_kwargs - [JSON] - -next_method - [VARCHAR(1000)] - -notes - [VARCHAR(1000)] - -operator - [VARCHAR(1000)] - -pid - [INTEGER] - -pool - [VARCHAR(256)] - NOT NULL - -pool_slots - [INTEGER] - NOT NULL - -priority_weight - [INTEGER] - -queue - [VARCHAR(256)] - -queued_by_job_id - [INTEGER] - -queued_dttm - [TIMESTAMP] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -trigger_id - [INTEGER] - -trigger_timeout - [DATETIME] - -try_number - [INTEGER] - -unixname - [VARCHAR(1000)] - -updated_at - [TIMESTAMP] + +task_instance + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +duration + [FLOAT] + +end_date + [TIMESTAMP] + +executor_config + [BLOB] + +external_executor_id + [VARCHAR(250)] + +hostname + [VARCHAR(1000)] + +job_id + [INTEGER] + +max_tries + [INTEGER] + +next_kwargs + [JSON] + +next_method + [VARCHAR(1000)] + +operator + [VARCHAR(1000)] + +pid + [INTEGER] + +pool + [VARCHAR(256)] + NOT NULL + +pool_slots + [INTEGER] + NOT NULL + +priority_weight + [INTEGER] + +queue + [VARCHAR(256)] + +queued_by_job_id + [INTEGER] + +queued_dttm + [TIMESTAMP] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +trigger_id + [INTEGER] + +trigger_timeout + [DATETIME] + +try_number + [INTEGER] + +unixname + [VARCHAR(1000)] + +updated_at + [TIMESTAMP] - + dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} - + dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} - + task_reschedule - -task_reschedule - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - NOT NULL - -end_date - [TIMESTAMP] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -reschedule_date - [TIMESTAMP] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -try_number - [INTEGER] - NOT NULL + +task_reschedule + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + NOT NULL + +end_date + [TIMESTAMP] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +reschedule_date + [TIMESTAMP] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +try_number + [INTEGER] + NOT NULL - + dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + + +task_instance--task_instance_note + +0..N +{0,1} + + +task_instance--task_instance_note + +0..N +{0,1} + + + +task_instance--task_instance_note + +0..N +{0,1} + + + +task_instance--task_instance_note + +0..N +{0,1} + + + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - [JSON] - -rendered_fields - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + [JSON] + +rendered_fields + [JSON] + NOT NULL - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_fail - -task_fail - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - -end_date - [TIMESTAMP] - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - -task_id - [VARCHAR(250)] - NOT NULL + +task_fail + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + +end_date + [TIMESTAMP] + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + +task_id + [VARCHAR(250)] + NOT NULL - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_map - -task_map - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -keys - [JSON] - -length - [INTEGER] - NOT NULL + +task_map + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +keys + [JSON] + +length + [INTEGER] + NOT NULL - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + xcom - -xcom - -dag_run_id - [INTEGER] - NOT NULL - -key - [VARCHAR(512)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL - -value - [BLOB] + +xcom + +dag_run_id + [INTEGER] + NOT NULL + +key + [VARCHAR(512)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL + +value + [BLOB] - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + log_template - -log_template - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -elasticsearch_id - [TEXT] - NOT NULL - -filename - [TEXT] - NOT NULL + +log_template + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +elasticsearch_id + [TEXT] + NOT NULL + +filename + [TEXT] + NOT NULL - + log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} - + dataset - -dataset - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -extra - [JSON] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -uri - [VARCHAR(3000)] - NOT NULL + +dataset + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +extra + [JSON] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +uri + [VARCHAR(3000)] + NOT NULL - + dataset--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dataset--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} - + dataset--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dataset_event - -dataset_event - -id - [INTEGER] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -extra - [JSON] - NOT NULL - -source_dag_id - [VARCHAR(250)] - -source_map_index - [INTEGER] - -source_run_id - [VARCHAR(250)] - -source_task_id - [VARCHAR(250)] - -timestamp - [TIMESTAMP] - NOT NULL + +dataset_event + +id + [INTEGER] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +extra + [JSON] + NOT NULL + +source_dag_id + [VARCHAR(250)] + +source_map_index + [INTEGER] + +source_run_id + [VARCHAR(250)] + +source_task_id + [VARCHAR(250)] + +timestamp + [TIMESTAMP] + NOT NULL - + dataset_event--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} - + import_error - -import_error - -id - [INTEGER] - NOT NULL - -filename - [VARCHAR(1024)] - -stacktrace - [TEXT] - -timestamp - [TIMESTAMP] + +import_error + +id + [INTEGER] + NOT NULL + +filename + [VARCHAR(1024)] + +stacktrace + [TEXT] + +timestamp + [TIMESTAMP] - + job - -job - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -end_date - [TIMESTAMP] - -executor_class - [VARCHAR(500)] - -hostname - [VARCHAR(500)] - -job_type - [VARCHAR(30)] - -latest_heartbeat - [TIMESTAMP] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -unixname - [VARCHAR(1000)] + +job + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +end_date + [TIMESTAMP] + +executor_class + [VARCHAR(500)] + +hostname + [VARCHAR(500)] + +job_type + [VARCHAR(30)] + +latest_heartbeat + [TIMESTAMP] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +unixname + [VARCHAR(1000)] - + log - -log - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -dttm - [TIMESTAMP] - -event - [VARCHAR(30)] - -execution_date - [TIMESTAMP] - -extra - [TEXT] - -map_index - [INTEGER] - -owner - [VARCHAR(500)] - -task_id - [VARCHAR(250)] + +log + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +dttm + [TIMESTAMP] + +event + [VARCHAR(30)] + +execution_date + [TIMESTAMP] + +extra + [TEXT] + +map_index + [INTEGER] + +owner + [VARCHAR(500)] + +task_id + [VARCHAR(250)] - + trigger - -trigger - -id - [INTEGER] - NOT NULL - -classpath - [VARCHAR(1000)] - NOT NULL - -created_date - [TIMESTAMP] - NOT NULL - -kwargs - [JSON] - NOT NULL - -triggerer_id - [INTEGER] + +trigger + +id + [INTEGER] + NOT NULL + +classpath + [VARCHAR(1000)] + NOT NULL + +created_date + [TIMESTAMP] + NOT NULL + +kwargs + [JSON] + NOT NULL + +triggerer_id + [INTEGER] - + trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + serialized_dag - -serialized_dag - -dag_id - [VARCHAR(250)] - NOT NULL - -dag_hash - [VARCHAR(32)] - NOT NULL - -data - [JSON] - -data_compressed - [BLOB] - -fileloc - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - [BIGINT] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +serialized_dag + +dag_id + [VARCHAR(250)] + NOT NULL + +dag_hash + [VARCHAR(32)] + NOT NULL + +data + [JSON] + +data_compressed + [BLOB] + +fileloc + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + [BIGINT] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + session - -session - -id - [INTEGER] - NOT NULL - -data - [BLOB] - -expiry - [DATETIME] - -session_id - [VARCHAR(255)] + +session + +id + [INTEGER] + NOT NULL + +data + [BLOB] + +expiry + [DATETIME] + +session_id + [VARCHAR(255)] - + sla_miss - -sla_miss - -dag_id - [VARCHAR(250)] - NOT NULL - -execution_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -description - [TEXT] - -email_sent - [BOOLEAN] - -notification_sent - [BOOLEAN] - -timestamp - [TIMESTAMP] + +sla_miss + +dag_id + [VARCHAR(250)] + NOT NULL + +execution_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +description + [TEXT] + +email_sent + [BOOLEAN] + +notification_sent + [BOOLEAN] + +timestamp + [TIMESTAMP] - + slot_pool - -slot_pool - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -pool - [VARCHAR(256)] - -slots - [INTEGER] + +slot_pool + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +pool + [VARCHAR(256)] + +slots + [INTEGER] - + variable - -variable - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -is_encrypted - [BOOLEAN] - -key - [VARCHAR(250)] - -val - [TEXT] + +variable + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +is_encrypted + [BOOLEAN] + +key + [VARCHAR(250)] + +val + [TEXT] diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 77a95c0edb75e..5d243bbf30ffc 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``65a852f26899`` (head) | ``ee8d93fcc81e`` | ``2.5.0`` | Add user comment to task_instance and dag_run. | +| ``1986afd32c1b`` (head) | ``ee8d93fcc81e`` | ``2.5.0`` | Add DagRunNote and TaskInstanceNote | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``ee8d93fcc81e`` | ``e07f49787c9d`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index 2f1ead7561929..70188ba5e1cfe 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -84,7 +84,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -98,7 +97,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -113,7 +111,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -128,7 +125,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 5e652723e6c92..fe0d144f47cd1 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1421,7 +1421,7 @@ def test_should_respond_200(self, dag_maker, session): "execution_date": dr.execution_date.isoformat(), "external_trigger": False, "logical_date": dr.logical_date.isoformat(), - "start_date": dr.logical_date.isoformat(), + "start_date": None, "state": "queued", "data_interval_start": dr.data_interval_start.isoformat(), "data_interval_end": dr.data_interval_end.isoformat(), @@ -1631,6 +1631,7 @@ def test_should_respond_200(self, dag_maker, session): "run_type": dr.run_type, "notes": new_notes_value, } + assert dr.dag_run_note.user_id is not None def test_should_raises_401_unauthenticated(self, session): response = self.client.patch( diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 8cbd1e6ecbc70..4eef4c841215c 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -380,18 +380,13 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): def test_should_respond_200_mapped_task_instance_with_rtif(self, session): """Verify we don't duplicate rows through join to RTIF""" tis = self.create_task_instances(session) - session.query() - ti = tis[0] - ti.map_index = 1 - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) - session.commit() - new_ti = TaskInstance(task=ti.task, run_id=ti.run_id, map_index=2) - for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: - setattr(new_ti, attr, getattr(ti, attr)) - session.add(new_ti) - rendered_fields = RTIF(new_ti, render_templates=False) - session.add(rendered_fields) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) session.commit() # in each loop, we should get the right mapped TI back @@ -1688,15 +1683,12 @@ def test_should_update_task_instance_state(self, session): assert response2.json["state"] == NEW_STATE def test_should_update_mapped_task_instance_state(self, session): - NEW_STATE = "failed" map_index = 1 - tis = self.create_task_instances(session) - ti = tis[0] - ti.map_index = map_index - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) + ti = TaskInstance(task=tis[0].task, run_id=tis[0].run_id, map_index=map_index) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + session.add(ti) session.commit() self.client.patch( @@ -1823,7 +1815,7 @@ def teardown_method(self): @provide_session def test_should_respond_200(self, session): - self.create_task_instances(session) + tis = self.create_task_instances(session) new_notes_value = "My super cool TaskInstance notes." response = self.client.patch( "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" @@ -1860,22 +1852,19 @@ def test_should_respond_200(self, session): "trigger": None, "triggerer_job": None, } + ti = tis[0] + assert ti.task_instance_note.user_id is not None def test_should_respond_200_mapped_task_instance_with_rtif(self, session): """Verify we don't duplicate rows through join to RTIF""" tis = self.create_task_instances(session) - session.query() - ti = tis[0] - ti.map_index = 1 - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) - session.commit() - new_ti = TaskInstance(task=ti.task, run_id=ti.run_id, map_index=2) - for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: - setattr(new_ti, attr, getattr(ti, attr)) - session.add(new_ti) - rendered_fields = RTIF(new_ti, render_templates=False) - session.add(rendered_fields) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) session.commit() # in each loop, we should get the right mapped TI back diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 0419a0396cefd..a79983601c406 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -57,7 +57,6 @@ def test_serialize(self, session): execution_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', - notes="my notes", ) session.add(dagrun_model) session.commit() @@ -78,7 +77,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "my notes", + "notes": None, } @pytest.mark.parametrize( @@ -142,7 +141,6 @@ def test_serialize(self, session): run_type=DagRunType.MANUAL.value, start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', - notes="Notes for first", ) dagrun_model_2 = DagRun( dag_id="my-dag-run", @@ -151,7 +149,6 @@ def test_serialize(self, session): execution_date=timezone.parse(self.second_time), start_date=timezone.parse(self.default_time), run_type=DagRunType.MANUAL.value, - notes="Notes for second", ) dagruns = [dagrun_model_1, dagrun_model_2] session.add_all(dagruns) @@ -174,7 +171,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "Notes for first", + "notes": None, }, { "dag_id": "my-dag-run", @@ -190,7 +187,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "Notes for second", + "notes": None, }, ], "total_entries": 2, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index b2f4f9d4deb64..8f4fb47881e2e 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2778,7 +2778,6 @@ def test_refresh_from_db(self, create_task_instance): "next_kwargs": None, "next_method": None, "updated_at": None, - "notes": None, } # Make sure we aren't missing any new value in our expected_values list. expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values} diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index a52f185d99354..fdaa7263b2e94 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -148,18 +148,6 @@ def test_trigger_dagrun_with_execution_date(self): assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date) self.assert_extra_link(dagrun, task, session) - def test_trigger_dagrun_with_custom_note(self): - notes_value = "Custom note for newly created DagRun." - task = TriggerDagRunOperator( - task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag, dag_run_notes=notes_value - ) - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - with create_session() as session: - dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() - assert dagrun.external_trigger - assert dagrun.notes == notes_value - def test_trigger_dagrun_twice(self): """Test TriggerDagRunOperator with custom execution_date.""" utc_now = timezone.utcnow() diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 07a1c5c5f1969..422bec37f3f67 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -255,6 +255,7 @@ def test_no_models_missing(self): with suppress(AttributeError): all_models.update({class_.__tablename__: class_}) exclusion_list = { + "ab_user", "variable", # leave alone "dataset", # not good way to know if "stale" "trigger", # self-maintaining @@ -272,6 +273,8 @@ def test_no_models_missing(self): "task_outlet_dataset_reference", # leave alone for now "dataset_dag_run_queue", # self-managed "dataset_event_dag_run", # foreign keys + "task_instance_note", # foreign keys + "dag_run_note", # foreign keys } from airflow.utils.db_cleanup import config_dict diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index fcdd80717f7f7..504ee7b09b956 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -218,7 +218,7 @@ def test_muldelete_dag_runs_action(session, admin_client, running_dag_run): follow_redirects=True, ) assert resp.status_code == 200 - assert session.query(TaskInstance).count() == 0 # Deletes associated TIs. + assert session.query(TaskInstance).count() == 0 # associated TIs are deleted assert session.query(DagRun).filter(DagRun.id == dag_run_id).count() == 0 diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index c4cee01a26406..4db2705070a3c 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -1017,7 +1017,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1048,7 +1047,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1079,7 +1077,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "EmptyOperator", "pid": None, "pool": "default_pool", @@ -1110,7 +1107,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1141,7 +1137,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1172,7 +1167,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1203,7 +1197,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool",