-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable accessing Variables from the top level of the DAG files #46869
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4e90d11
to
b95c88e
Compare
b95c88e
to
641bad3
Compare
Since I want to maintain the property of being able to run a DAG processor without the Execution API server running, and since the Dag Processor Manager already has a database connection I have chosen to run the FastAPI execution server in process. To achive this I make use of two features: - The first is the abilty to provide an httpx.Client with a Transport object that has an WSGI appliction to not send real requests, but to instead call the WSGI app directly to service the request - The second is a2wsgi. Since we are making a call from with in a synchronus context we have to give httpx a WSGI (if we were making Async requests we could give httpx an ASGI app directly), and FastAPI at it's outer layers is an async framework (even if it supports running sync routes) we need to somehow wrap the async call to return a sync result. a2wsgi does this for us by using a async loop off the main thread. I tested this with a simple DAG file initially: ```python import time import sys from airflow.decorators import dag, task from airflow.sdk import Variable from airflow.utils.session import create_session if Variable.get("hi", default=None): raise RuntimeError("Var hi was defined") @dag(schedule=None) def hi(): @task() def hello(): print("hello") time.sleep(3) print("goodbye") print("err mesg", file=sys.stderr) hello() hi() ``` If the variable is defined it results in an import error. If the variable is not defined you get the DAG defined.
641bad3
to
1dc5464
Compare
ashb
commented
Feb 19, 2025
jedcunningham
approved these changes
Feb 19, 2025
ntr
pushed a commit
to ntr/airflow
that referenced
this pull request
Feb 20, 2025
…e#46869) Since I want to maintain the property of being able to run a DAG processor without the Execution API server running, and since the Dag Processor Manager already has a database connection I have chosen to run the FastAPI execution server in process. To achive this I make use of two features: - The first is the abilty to provide an httpx.Client with a Transport object that has an WSGI appliction to not send real requests, but to instead call the WSGI app directly to service the request - The second is a2wsgi. Since we are making a call from with in a synchronus context we have to give httpx a WSGI (if we were making Async requests we could give httpx an ASGI app directly), and FastAPI at it's outer layers is an async framework (even if it supports running sync routes) we need to somehow wrap the async call to return a sync result. a2wsgi does this for us by using a async loop off the main thread. I tested this with a simple DAG file initially: ```python import time import sys from airflow.decorators import dag, task from airflow.sdk import Variable from airflow.utils.session import create_session if Variable.get("hi", default=None): raise RuntimeError("Var hi was defined") @dag(schedule=None) def hi(): @task() def hello(): print("hello") time.sleep(3) print("goodbye") print("err mesg", file=sys.stderr) hello() hi() ``` If the variable is defined it results in an import error. If the variable is not defined you get the DAG defined.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #45449
Since I want to maintain the property of being able to run a DAG processor
without the Execution API server running, and since the Dag Processor Manager
already has a database connection I have chosen to run the FastAPI execution
server in process.
To achive this I make use of two features:
that has an WSGI appliction to not send real requests, but to instead call
the WSGI app directly to service the request
context we have to give httpx a WSGI (if we were making Async requests we
could give httpx an ASGI app directly), and FastAPI at it's outer layers is
an async framework (even if it supports running sync routes) we need to
somehow wrap the async call to return a sync result. a2wsgi does this for us
by using a async loop off the main thread.
I tested this with a simple DAG file initially:
If the variable is defined it results in an import error. If the variable is
not defined you get the DAG defined.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.