This provider is an experimental alpha containing necessary components to orchestrate and schedule Ray tasks using Airflow. It is actively maintained and being developed to bring production-ready workflows to Ray using Airflow. This release contains everything needed to begin building these workflows using the Airflow Taskflow API.
Current Release: 0.2.1
Visit the Ray Project page for more info on Ray.
⚠️ The server version and client version (build) of Ray MUST be the same.
- Python Version >= 3.7
- Airflow Version >= 2.0.0
- Ray Version == 1.3.0
- Filelock >= 3.0.0
- Ray XCom Backend: Custom XCom backend to assist operators in moving data between tasks using the Ray API with its internal Plasma store, thereby allowing for in-memory distributed processing and handling of large data objects.
- Ray Hook: Extension of
Http
hook that uses the Ray client to provide connections to the Ray Server. - Ray Decorator: Task decorator to be used with the task flow API, combining wrapping the existing airflow
@task
decorate withRay.remote
functionality, thereby executing each task on the Ray cluster.
-
Install the astro-cli. This project was made using the
astro dev init
command, but that has already been done for you. -
In your Airflow
Dockerfile
, your docker file should look something like this:FROM quay.io/astronomer/ap-airflow:2.0.2-1-buster-onbuild USER root RUN pip uninstall astronomer-airflow-version-check -y USER astro ENV AIRFLOW__CORE__XCOM_BACKEND=ray_provider.xcom.ray_backend.RayBackend
Check ap-airflow version, if unsure, change to
ap-airflow:latest-onbuild
. Please also feel free to add any pip packages to therequirements.txt
but note that these packages will only exist in airflow, and will need to be installed on Ray separately. -
Configure Ray Locally. To run Ray locally, you'll need a minimum 6GB of free memory. To start, in your environment with Ray installed, run:
(venv)$ Ray start --num-cpus=8 --object-store-memory=7000000000 --head
If you have extra resources, you can bump the memory up.
You should now be able to open the Ray dashboard at http://127.0.0.1:8265/.
-
Start your Airflow environment and open the UI. If you have installed the astro CLI, you can do this by running
astro dev start
. -
In the Airflow UI, add an
Airflow Pool
with the following:Pool (name): ray_worker_pool Slots: 25
-
If you are running Ray locally, get your IP address by visiting
ipv4.icanhazip.com
-
In the Airflow UI, add an
Airflow Connection
with the following:Conn Id: ray_cluster_connection Conn Type: HTTP Host: Cluster IP Address, with basic Auth params if needed Port: 10001
-
In your Airflow DAG Python file, you must include the following in your
default_args
dictionary:from ray_provider.xcom.ray_backend import RayBackend . . . default_args = { 'on_success_callback': RayBackend.on_success_callback, 'on_failure_callback': RayBackend.on_failure_callback, . . . } @dag( default_args=default_args, . . ) def ray_example_dag(): # do stuff
-
Using the taskflow API, your airflow task should now use the
@ray_task
decorator for any Ray task and add theray_conn_id
, parameter astask_args
, like:from ray_provider.decorators import ray_task default_args = { 'on_success_callback': RayBackend.on_success_callback, 'on_failure_callback': RayBackend.on_failure_callback, . . . } task_args = {"ray_conn_id": "ray_cluster_connection"} . . . @dag( default_args=default_args, . . ) def ray_example_dag(): @ray_task(**task_args) def sum_cols(df: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame(df.sum()).T
This project is built in collaboration between Astronomer and Anyscale, with active contributions from: