Skip to content

Commit

Permalink
Fix list dags command for get_dagmodel is None (apache#36739)
Browse files Browse the repository at this point in the history
* Fix list dag command for get_dagmodel is None

In the dags list command, we initially retrieve the dag_id from the dagbag. For each DAG, we fetch the corresponding DAG details from the metadata database. If the data at both locations is not in sync, the DagModel.get_dagmodel function returns None, leading to a serialization failure. This PR trying to use the field available from dagbag to initialise dag details. some fields are not available and I'm setting the None value for those
  • Loading branch information
pankajastro authored Jan 12, 2024
1 parent aa25aff commit fd68f05
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
39 changes: 38 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,40 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
print(f"File {filename} saved")


def _get_dagbag_dag_details(dag: DAG) -> dict:
"""Return a dagbag dag details dict."""
return {
"dag_id": dag.dag_id,
"root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
"is_subdag": dag.is_subdag,
"last_parsed_time": None,
"last_pickled": None,
"last_expired": None,
"scheduler_lock": None,
"pickle_id": dag.pickle_id,
"default_view": dag.default_view,
"fileloc": dag.fileloc,
"file_token": None,
"owners": dag.owner,
"description": dag.description,
"schedule_interval": dag.schedule_interval,
"timetable_description": dag.timetable.description,
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
"max_active_runs": dag.max_active_runs,
"has_task_concurrency_limits": any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
),
"has_import_errors": False,
"next_dagrun": None,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"next_dagrun_create_after": None,
}


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
Expand Down Expand Up @@ -377,7 +411,10 @@ def dag_list_dags(args, session=NEW_SESSION) -> None:

def get_dag_detail(dag: DAG) -> dict:
dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
dag_detail = dag_schema.dump(dag_model)
if dag_model:
dag_detail = dag_schema.dump(dag_model)
else:
dag_detail = _get_dagbag_dag_details(dag)
return {col: dag_detail[col] for col in valid_cols}

AirflowConsole().print_as(
Expand Down
22 changes: 21 additions & 1 deletion tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import time_machine

from airflow import settings
from airflow.api_connexion.schemas.dag_schema import DAGSchema
from airflow.api_connexion.schemas.dag_schema import DAGSchema, dag_schema
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.decorators import task
Expand Down Expand Up @@ -565,6 +565,26 @@ def test_cli_list_dags_prints_import_errors(self):
out = temp_stderr.getvalue()
assert "Failed to load all files." in out

@conf_vars({("core", "load_examples"): "true"})
@mock.patch("airflow.models.DagModel.get_dagmodel")
def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel):
mock_get_dagmodel.return_value = None
args = self.parser.parse_args(["dags", "list", "--output", "json"])
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
dag_command.dag_list_dags(args)
out = temp_stdout.getvalue()
dag_list = json.loads(out)
for key in ["dag_id", "fileloc", "owners", "is_paused"]:
assert key in dag_list[0]
assert any("airflow/example_dags/example_complex.py" in d["fileloc"] for d in dag_list)

@conf_vars({("core", "load_examples"): "true"})
def test_dagbag_dag_col(self):
valid_cols = [c for c in dag_schema.fields]
dagbag = DagBag(include_examples=True)
dag_details = dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
assert list(dag_details.keys()) == valid_cols

@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_import_errors(self):
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_invalid_cron.py")
Expand Down

0 comments on commit fd68f05

Please sign in to comment.