Skip to content

Commit

Permalink
Add creating_job_id to DagRun table (apache#11396)
Browse files Browse the repository at this point in the history
This PR introduces creating_job_id column in DagRun table that links a
DagRun to job that created it. Part of apache#11302

Co-authored-by: Kaxil Naik <[email protected]>
  • Loading branch information
turbaszek and kaxil authored Oct 17, 2020
1 parent bf468c7 commit 112f7d7
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 3 deletions.
1 change: 1 addition & 0 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):
session=session,
conf=self.conf,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=self.id,
)

# set required transient field
Expand Down
9 changes: 9 additions & 0 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import DagRun
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance
from airflow.stats import Stats
Expand Down Expand Up @@ -78,6 +79,14 @@ class BaseJob(Base, LoggingMixin):
foreign_keys=id,
backref=backref('queued_by_job', uselist=False),
)

dag_runs = relationship(
DagRun,
primaryjoin=id == DagRun.creating_job_id,
foreign_keys=id,
backref=backref('creating_job'),
)

"""
TaskInstances which have been enqueued by this Job.
Expand Down
3 changes: 2 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,8 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
state=State.RUNNING,
external_trigger=False,
session=session,
dag_hash=dag_hash
dag_hash=dag_hash,
creating_job_id=self.id,
)

self._update_dag_next_dagruns(dag_models, session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add creating_job_id to DagRun table
Revision ID: 364159666cbd
Revises: 849da589634d
Create Date: 2020-10-10 09:08:07.332456
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '364159666cbd'
down_revision = '849da589634d'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Add creating_job_id to DagRun table"""
op.add_column('dag_run', sa.Column('creating_job_id', sa.Integer))


def downgrade():
"""Unapply Add job_id to DagRun table"""
op.drop_column('dag_run', 'creating_job_id')
8 changes: 6 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,8 @@ def create_dagrun(
conf=None,
run_type=None,
session=None,
dag_hash=None
dag_hash=None,
creating_job_id=None,
):
"""
Creates a dag run from this dag including the tasks associated with this dag.
Expand All @@ -1675,6 +1676,8 @@ def create_dagrun(
:type external_trigger: bool
:param conf: Dict containing configuration/parameters to pass to the DAG
:type conf: dict
:param creating_job_id: id of the job creating this DagRun
:type creating_job_id: int
:param session: database session
:type session: sqlalchemy.orm.session.Session
:param dag_hash: Hash of Serialized DAG
Expand Down Expand Up @@ -1702,7 +1705,8 @@ def create_dagrun(
conf=conf,
state=state,
run_type=run_type.value,
dag_hash=dag_hash
dag_hash=dag_hash,
creating_job_id=creating_job_id
)
session.add(run)
session.flush()
Expand Down
3 changes: 3 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class DagRun(Base, LoggingMixin):
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
creating_job_id = Column(Integer)
external_trigger = Column(Boolean, default=True)
run_type = Column(String(50), nullable=False)
conf = Column(PickleType)
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(
state: Optional[str] = None,
run_type: Optional[str] = None,
dag_hash: Optional[str] = None,
creating_job_id: Optional[int] = None,
):
self.dag_id = dag_id
self.run_id = run_id
Expand All @@ -108,6 +110,7 @@ def __init__(
self.state = state
self.run_type = run_type
self.dag_hash = dag_hash
self.creating_job_id = creating_job_id
super().__init__()

def __repr__(self):
Expand Down
14 changes: 14 additions & 0 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,3 +1536,17 @@ def test_reset_orphaned_tasks_specified_dagrun(self):
ti2.refresh_from_db(session=session)
self.assertEqual(State.SCHEDULED, ti1.state)
self.assertEqual(State.NONE, ti2.state)

def test_job_id_is_assigned_to_dag_run(self):
dag_id = 'test_job_id_is_assigned_to_dag_run'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
DummyOperator(task_id="dummy_task", dag=dag)

job = BackfillJob(
dag=dag,
executor=MockExecutor(),
start_date=datetime.datetime.now() - datetime.timedelta(days=1)
)
job.run()
dr: DagRun = dag.get_last_dagrun()
assert dr.creating_job_id == job.id
27 changes: 27 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,33 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self):
full_filepath=dag.fileloc, dag_id=dag_id
)

def test_scheduler_sets_job_id_on_dag_run(self):
dag = DAG(
dag_id='test_scheduler_sets_job_id_on_dag_run',
start_date=DEFAULT_DATE)

DummyOperator(
task_id='dummy',
dag=dag,
)

dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
include_examples=False,
read_dags_from_db=True
)
dagbag.bag_dag(dag=dag, root_dag=dag)
dagbag.sync_to_db()
dag_model = DagModel.get_dagmodel(dag.dag_id)

scheduler = SchedulerJob(executor=self.null_exec)
scheduler.processor_agent = mock.MagicMock()

with create_session() as session:
scheduler._create_dag_runs([dag_model], session)

assert dag.get_last_dagrun().creating_job_id == scheduler.id


@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():
Expand Down
8 changes: 8 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,14 @@ def test_create_dagrun_run_type_is_obtained_from_run_id(self):
dr = dag.create_dagrun(run_id="custom_is_set_to_manual", state=State.NONE)
assert dr.run_type == DagRunType.MANUAL.value

def test_create_dagrun_job_id_is_set(self):
job_id = 42
dag = DAG(dag_id="test_create_dagrun_job_id_is_set")
dr = dag.create_dagrun(
run_id="test_create_dagrun_job_id_is_set", state=State.NONE, creating_job_id=job_id
)
assert dr.creating_job_id == job_id

@parameterized.expand(
[
(State.NONE,),
Expand Down

0 comments on commit 112f7d7

Please sign in to comment.