Skip to content

Commit

Permalink
Adding DAG import error stack trace as span event in OTEL traces for …
Browse files Browse the repository at this point in the history
…Airflow (apache#41865)

* enhancement for issue apache#41863

* fixed minor typo

* Update airflow/dag_processing/manager.py

Thanks for the snippet! Will commit this suggestion.

Co-authored-by: Ephraim Anierobi <[email protected]>

* Update manager.py

reformatting

---------

Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
howardyoo and ephraimbuddy authored Aug 30, 2024
1 parent b8a25b9 commit 8b0a781
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,8 @@ def wait_until_finished(self):
while not processor.done:
time.sleep(0.1)

def _collect_results_from_processor(self, processor) -> None:
@provide_session
def _collect_results_from_processor(self, processor, session: Session = NEW_SESSION) -> None:
self.log.debug("Processor for %s finished", processor.file_path)
Stats.decr("dag_processing.processes", tags={"file_path": processor.file_path, "action": "finish"})
last_finish_time = timezone.utcnow()
Expand Down Expand Up @@ -1187,6 +1188,19 @@ def _collect_results_from_processor(self, processor) -> None:
span.set_attribute("import_errors", count_import_errors)
if count_import_errors > 0:
span.set_attribute("error", True)
import_errors = session.scalars(
select(ParseImportError).where(ParseImportError.filename == processor.file_path)
).all()
for import_error in import_errors:
span.add_event(
name="exception",
attributes={
"filename": import_error.filename,
"exception.type": "ParseImportError",
"exception.name": "Import error when processing DAG file",
"exception.stacktrace": import_error.stacktrace,
},
)

span.end(end_time=datetime_to_nano(last_finish_time))

Expand Down

0 comments on commit 8b0a781

Please sign in to comment.