Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable accessing Variables from the top level of the DAG files #46869

Merged
merged 1 commit into from
Feb 19, 2025

Conversation

ashb
Copy link
Member

@ashb ashb commented Feb 18, 2025

Fixes #45449

Since I want to maintain the property of being able to run a DAG processor
without the Execution API server running, and since the Dag Processor Manager
already has a database connection I have chosen to run the FastAPI execution
server in process.

To achive this I make use of two features:

  • The first is the abilty to provide an httpx.Client with a Transport object
    that has an WSGI appliction to not send real requests, but to instead call
    the WSGI app directly to service the request
  • The second is a2wsgi. Since we are making a call from with in a synchronus
    context we have to give httpx a WSGI (if we were making Async requests we
    could give httpx an ASGI app directly), and FastAPI at it's outer layers is
    an async framework (even if it supports running sync routes) we need to
    somehow wrap the async call to return a sync result. a2wsgi does this for us
    by using a async loop off the main thread.

I tested this with a simple DAG file initially:

import time
import sys
from airflow.decorators import dag, task
from airflow.sdk import Variable

from airflow.utils.session import create_session

if Variable.get("hi", default=None):
    raise RuntimeError("Var hi was defined")

@dag(schedule=None)
def hi():
    @task()
    def hello():
        print("hello")
        time.sleep(3)
        print("goodbye")
        print("err mesg", file=sys.stderr)

    hello()

hi()

If the variable is defined it results in an import error. If the variable is
not defined you get the DAG defined.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb ashb force-pushed the variable-access-dag-parsing branch 2 times, most recently from 4e90d11 to b95c88e Compare February 19, 2025 11:40
@ashb ashb marked this pull request as ready for review February 19, 2025 11:40
@ashb ashb force-pushed the variable-access-dag-parsing branch from b95c88e to 641bad3 Compare February 19, 2025 13:00
Since I want to maintain the property of being able to run a DAG processor
without the Execution API server running, and since the Dag Processor Manager
already has a database connection I have chosen to run the FastAPI execution
server in process.

To achive this I make use of two features:

- The first is the abilty to provide an httpx.Client with a Transport object
  that has an WSGI appliction to not send real requests, but to instead call
  the WSGI app directly to service the request
- The second is a2wsgi. Since we are making a call from with in a synchronus
  context we have to give httpx a WSGI (if we were making Async requests we
  could give httpx an ASGI app directly), and FastAPI at it's outer layers is
  an async framework (even if it supports running sync routes) we need to
  somehow wrap the async call to return a sync result. a2wsgi does this for us
  by using a async loop off the main thread.

I tested this with a simple DAG file initially:

```python
import time
import sys
from airflow.decorators import dag, task
from airflow.sdk import Variable

from airflow.utils.session import create_session

if Variable.get("hi", default=None):
    raise RuntimeError("Var hi was defined")

@dag(schedule=None)
def hi():
    @task()
    def hello():
        print("hello")
        time.sleep(3)
        print("goodbye")
        print("err mesg", file=sys.stderr)

    hello()

hi()
```

If the variable is defined it results in an import error. If the variable is
not defined you get the DAG defined.
@ashb ashb force-pushed the variable-access-dag-parsing branch from 641bad3 to 1dc5464 Compare February 19, 2025 15:03
@ashb ashb requested a review from XD-DENG as a code owner February 19, 2025 15:03
@ashb ashb merged commit 711d1fd into main Feb 19, 2025
149 checks passed
@ashb ashb deleted the variable-access-dag-parsing branch February 19, 2025 17:01
ntr pushed a commit to ntr/airflow that referenced this pull request Feb 20, 2025
…e#46869)

Since I want to maintain the property of being able to run a DAG processor
without the Execution API server running, and since the Dag Processor Manager
already has a database connection I have chosen to run the FastAPI execution
server in process.

To achive this I make use of two features:

- The first is the abilty to provide an httpx.Client with a Transport object
  that has an WSGI appliction to not send real requests, but to instead call
  the WSGI app directly to service the request
- The second is a2wsgi. Since we are making a call from with in a synchronus
  context we have to give httpx a WSGI (if we were making Async requests we
  could give httpx an ASGI app directly), and FastAPI at it's outer layers is
  an async framework (even if it supports running sync routes) we need to
  somehow wrap the async call to return a sync result. a2wsgi does this for us
  by using a async loop off the main thread.

I tested this with a simple DAG file initially:

```python
import time
import sys
from airflow.decorators import dag, task
from airflow.sdk import Variable

from airflow.utils.session import create_session

if Variable.get("hi", default=None):
    raise RuntimeError("Var hi was defined")

@dag(schedule=None)
def hi():
    @task()
    def hello():
        print("hello")
        time.sleep(3)
        print("goodbye")
        print("err mesg", file=sys.stderr)

    hello()

hi()
```

If the variable is defined it results in an import error. If the variable is
not defined you get the DAG defined.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-72: Add support to get Variables in task sdk outside of context
2 participants