Skip to content

Commit

Permalink
Add optional session argument to xcom_push. (apache#11485)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp authored Oct 19, 2020
1 parent 7206fd7 commit 22f6db7
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,11 +1710,14 @@ def set_duration(self) -> None:
self.duration = None
self.log.debug("Task Duration set to %s", self.duration)

@provide_session
def xcom_push(
self,
key: str,
value: Any,
execution_date: Optional[datetime] = None) -> None:
self,
key: str,
value: Any,
execution_date: Optional[datetime] = None,
session: Session = None,
) -> None:
"""
Make an XCom available for tasks to pull.
Expand All @@ -1727,6 +1730,8 @@ def xcom_push(
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
:type execution_date: datetime
:param session: Sqlalchemy ORM Session
:type session: Session
"""
if execution_date and execution_date < self.execution_date:
raise ValueError(
Expand All @@ -1739,7 +1744,9 @@ def xcom_push(
value=value,
task_id=self.task_id,
dag_id=self.dag_id,
execution_date=execution_date or self.execution_date)
execution_date=execution_date or self.execution_date,
session=session,
)

@provide_session
def xcom_pull( # pylint: disable=inconsistent-return-statements
Expand Down

0 comments on commit 22f6db7

Please sign in to comment.