Skip to content

Commit

Permalink
Add logical_date to OpenAPI DAGRun schema (apache#17122)
Browse files Browse the repository at this point in the history
The idea is to make *both* logical_date and execution_date get
serialized when a DAGRun is returned, but prefer logical_date
from the user input (and fall back to execution_date when only it
is provided).
  • Loading branch information
uranusjr authored Aug 20, 2021
1 parent 8fde9a4 commit 65684dd
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 38 deletions.
15 changes: 6 additions & 9 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,34 +243,31 @@ def post_dag_run(dag_id, session):
except ValidationError as err:
raise BadRequest(detail=str(err))

execution_date = post_body["execution_date"]
logical_date = post_body["execution_date"]
run_id = post_body["run_id"]
dagrun_instance = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
or_(DagRun.run_id == run_id, DagRun.execution_date == logical_date),
)
.first()
)
if not dagrun_instance:
dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=execution_date,
execution_date=logical_date,
state=State.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
)
return dagrun_schema.dump(dag_run)

if dagrun_instance.execution_date == execution_date:
if dagrun_instance.execution_date == logical_date:
raise AlreadyExists(
detail=f"DAGRun with DAG ID: '{dag_id}' and "
f"DAGRun ExecutionDate: '{post_body['execution_date']}' already exists"
detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun logical date: '{logical_date}' already exists"
)

raise AlreadyExists(
detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{post_body['run_id']}' already exists"
)
raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
18 changes: 14 additions & 4 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1946,17 +1946,27 @@ components:
dag_id:
type: string
readOnly: true
execution_date:
description: >
The execution date. This is the time when the DAG run should be started according
to the DAG definition.
logical_date:
type: string
nullable: true
description: |
The logical date (previously called execution date). This is the time or interval covered by
this DAG run, according to the DAG definition.
The value of this field can be set only when creating the object. If you try to modify the
field of an existing object, the request fails with an BAD_REQUEST error.
This together with DAG_ID are a unique key.
format: date-time
execution_date:
type: string
nullable: true
description: |
The execution date. This is the same as logical_date, kept for backwards compatibility.
If both this field and logical_date are provided but with different values, the request
will fail with an BAD_REQUEST error.
format: date-time
deprecated: true
start_date:
type: string
format: date-time
Expand Down
39 changes: 32 additions & 7 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import json
from typing import List, NamedTuple

from marshmallow import fields, pre_load
from marshmallow import fields, post_dump, pre_load
from marshmallow.schema import Schema
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
from pendulum.parsing import ParserError
Expand All @@ -45,6 +45,9 @@ def _deserialize(self, value, attr, data, **kwargs):
return value


_MISSING = object()


class DAGRunSchema(SQLAlchemySchema):
"""Schema for DAGRun"""

Expand All @@ -56,7 +59,7 @@ class Meta:

run_id = auto_field(data_key='dag_run_id')
dag_id = auto_field(dump_only=True)
execution_date = auto_field(validate=validate_istimezone)
execution_date = auto_field(data_key="logical_date", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
Expand All @@ -65,18 +68,40 @@ class Meta:

@pre_load
def autogenerate(self, data, **kwargs):
"""Auto generate run_id and execution_date if they are not loaded"""
if "execution_date" not in data.keys():
data["execution_date"] = str(timezone.utcnow())
if "dag_run_id" not in data.keys():
"""Auto generate run_id and logical_date if they are not provided.
For compatibility, if `execution_date` is submitted, it is converted
to `logical_date`.
"""
logical_date = data.get("logical_date", _MISSING)
execution_date = data.pop("execution_date", _MISSING)
if logical_date is execution_date is _MISSING: # Both missing.
data["logical_date"] = str(timezone.utcnow())
elif logical_date is _MISSING: # Only logical_date missing.
data["logical_date"] = execution_date
elif execution_date is _MISSING: # Only execution_date missing.
pass
elif logical_date != execution_date: # Both provided but don't match.
raise BadRequest(
"logical_date conflicts with execution_date",
detail=f"{logical_date!r} != {execution_date!r}",
)

if "dag_run_id" not in data:
try:
data["dag_run_id"] = DagRun.generate_run_id(
DagRunType.MANUAL, timezone.parse(data["execution_date"])
DagRunType.MANUAL, timezone.parse(data["logical_date"])
)
except (ParserError, TypeError) as err:
raise BadRequest("Incorrect datetime argument", detail=str(err))
return data

@post_dump
def autofill(self, data, **kwargs):
"""Populate execution_date from logical_date for compatibility."""
data["execution_date"] = data["logical_date"]
return data


class DAGRunCollection(NamedTuple):
"""List of DAGRuns with metadata"""
Expand Down
108 changes: 90 additions & 18 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from unittest import mock

import pytest
from parameterized import parameterized
Expand Down Expand Up @@ -211,6 +212,7 @@ def test_should_respond_200(self, session):
'dag_run_id': 'TEST_DAG_RUN_ID',
'end_date': None,
'state': 'running',
'logical_date': self.default_time,
'execution_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
Expand Down Expand Up @@ -265,6 +267,7 @@ def test_should_respond_200(self, session):
'end_date': None,
'state': 'running',
'execution_date': self.default_time,
'logical_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -275,6 +278,7 @@ def test_should_respond_200(self, session):
'end_date': None,
'state': 'running',
'execution_date': self.default_time_2,
'logical_date': self.default_time_2,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand Down Expand Up @@ -313,6 +317,7 @@ def test_return_correct_results_with_order_by(self, session):
'end_date': None,
'state': 'running',
'execution_date': self.default_time_2,
'logical_date': self.default_time_2,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -323,6 +328,7 @@ def test_return_correct_results_with_order_by(self, session):
'end_date': None,
'state': 'running',
'execution_date': self.default_time,
'logical_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand Down Expand Up @@ -566,6 +572,7 @@ def test_should_respond_200(self):
'end_date': None,
'state': 'running',
'execution_date': self.default_time,
'logical_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -576,6 +583,7 @@ def test_should_respond_200(self):
'end_date': None,
'state': 'running',
'execution_date': self.default_time_2,
'logical_date': self.default_time_2,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -600,6 +608,7 @@ def test_order_by_descending_works(self):
'end_date': None,
'state': 'running',
'execution_date': self.default_time_2,
'logical_date': self.default_time_2,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -610,6 +619,7 @@ def test_order_by_descending_works(self):
'end_date': None,
'state': 'running',
'execution_date': self.default_time,
'logical_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand Down Expand Up @@ -645,6 +655,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
'end_date': None,
'state': 'running',
'execution_date': self.default_time,
'logical_date': self.default_time,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand All @@ -655,6 +666,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
'end_date': None,
'state': 'running',
'execution_date': self.default_time_2,
'logical_date': self.default_time_2,
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
Expand Down Expand Up @@ -905,41 +917,101 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids):


class TestPostDagRun(TestDagRunEndpoint):
@parameterized.expand(
@pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
@pytest.mark.parametrize(
"dag_run_id, logical_date",
[
(
"All fields present",
{
"dag_run_id": "TEST_DAG_RUN",
"execution_date": "2020-06-11T18:00:00+00:00",
},
),
("dag_run_id missing", {"execution_date": "2020-06-11T18:00:00+00:00"}),
("dag_run_id and execution_date missing", {}),
]
pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", id="both-present"),
pytest.param(None, "2020-06-11T18:00:00+00:00", id="only-date"),
pytest.param(None, None, id="both-missing"),
],
)
def test_should_respond_200(self, name, request_json):
del name
def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date):
self._create_dag("TEST_DAG_ID")

# We'll patch airflow.utils.timezone.utcnow to always return this so we
# can check the returned dates.
fixed_now = timezone.utcnow()

request_json = {}
if logical_date is not None:
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id

with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200

if logical_date is None:
expected_logical_date = fixed_now.isoformat()
else:
expected_logical_date = logical_date
if dag_run_id is None:
expected_dag_run_id = f"manual__{expected_logical_date}"
else:
expected_dag_run_id = dag_run_id
assert {
"conf": {},
"dag_id": "TEST_DAG_ID",
"dag_run_id": expected_dag_run_id,
"end_date": None,
"execution_date": expected_logical_date,
"logical_date": expected_logical_date,
"external_trigger": True,
"start_date": None,
"state": "queued",
} == response.json

def test_should_response_200_for_matching_execution_date_logical_date(self):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns", json=request_json, environ_overrides={'REMOTE_USER': "test"}
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-10T08:25:56.939143+00:00",
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {
"conf": {},
"dag_id": "TEST_DAG_ID",
"dag_run_id": response.json["dag_run_id"],
"dag_run_id": "manual__2020-11-10T08:25:56.939143+00:00",
"end_date": None,
"execution_date": response.json["execution_date"],
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-10T08:25:56.939143+00:00",
"external_trigger": True,
"start_date": None,
"state": "queued",
} == response.json

def test_should_response_400_for_conflicting_execution_date_logical_date(self):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-11T08:25:56.939143+00:00",
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["title"] == "logical_date conflicts with execution_date"
assert response.json["detail"] == (
"'2020-11-11T08:25:56.939143+00:00' != '2020-11-10T08:25:56.939143+00:00'"
)

@parameterized.expand(
[
({'execution_date': "2020-11-10T08:25:56.939143"}, 'Naive datetime is disallowed'),
({'execution_date': "2020-11-10T08:25:56P"}, "{'execution_date': ['Not a valid datetime.']}"),
({'execution_date': "2020-11-10T08:25:56P"}, "{'logical_date': ['Not a valid datetime.']}"),
({'logical_date': "2020-11-10T08:25:56.939143"}, 'Naive datetime is disallowed'),
({'logical_date': "2020-11-10T08:25:56P"}, "{'logical_date': ['Not a valid datetime.']}"),
]
)
def test_should_response_400_for_naive_datetime_and_bad_datetime(self, data, expected):
Expand Down Expand Up @@ -1054,7 +1126,7 @@ def test_response_409_when_execution_date_is_same(self):
assert response.status_code == 409, response.data
assert response.json == {
"detail": "DAGRun with DAG ID: 'TEST_DAG_ID' and "
"DAGRun ExecutionDate: '2020-06-11 18:00:00+00:00' already exists",
"DAGRun logical date: '2020-06-11 18:00:00+00:00' already exists",
"status": 409,
"title": "Conflict",
"type": EXCEPTIONS_LINK_MAP[409],
Expand Down
3 changes: 3 additions & 0 deletions tests/api_connexion/schemas/test_dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test_serialize(self, session):
"end_date": None,
"state": "running",
"execution_date": self.default_time,
"logical_date": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {"start": "stop"},
Expand Down Expand Up @@ -150,6 +151,7 @@ def test_serialize(self, session):
"dag_run_id": "my-dag-run",
"end_date": None,
"execution_date": self.default_time,
"logical_date": self.default_time,
"external_trigger": True,
"state": "running",
"start_date": self.default_time,
Expand All @@ -161,6 +163,7 @@ def test_serialize(self, session):
"end_date": None,
"state": "running",
"execution_date": self.default_time,
"logical_date": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
Expand Down

0 comments on commit 65684dd

Please sign in to comment.