Skip to content

Latest commit

 

History

History
49 lines (36 loc) · 960 Bytes

3- Data Update To Run Pipeline.md

File metadata and controls

49 lines (36 loc) · 960 Bytes

Instead of use daily to run task, we can schedule to run task base on data update.

![[Pasted image 20240508071220.png]]

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

my_file = Dataset("/tmp/my_file.txt")

with DAG(
    dag_id="producer",
    schedule="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False
):

    @task(outlets=[my_file])
    def update_my_file():
        with open(my_file.uri, "a+") as f:
            f.write("producer update")
    
    update_my_file()
from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import date, datetime

my_file = Dataset("/tmp/my_file.txt")

with DAG(
    dag_id="consumer",
    schedule=[my_file],
    start_date=datetime(2022, 1, 1),
    catchup=False
):

    @task
    def read_my_file():
        with open(my_file.uri, "r") as f:
            print(f.read())
    
    read_my_file()