Skip to content

Commit

Permalink
feat: Add TriggerDagRunOperator to gmaps to_src
Browse files Browse the repository at this point in the history
  • Loading branch information
yeha98552 committed May 23, 2024
1 parent a5fea85 commit 12fb45b
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions airflow/dags/d_gmaps_crawler_to_src.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from utils.gcp import query_bq_to_df

from airflow.decorators import dag, task
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.providers.docker.operators.docker import DockerOperator

RAW_BUCKET = os.environ.get("GCP_GCS_RAW_BUCKET")
Expand Down Expand Up @@ -50,7 +51,7 @@ def get_attraction_list() -> list[list[dict]]:
]

@task
def run_gmaps_crawler(batch: list[dict]):
def e_gmaps_crawler(batch: list[dict]):
for attraction in batch:
print(f"crawling attraction: {attraction}")
attraction_name = attraction["attraction_name"]
Expand Down Expand Up @@ -83,8 +84,20 @@ def run_gmaps_crawler(batch: list[dict]):
)
crawler_task.execute({})

trigger_d_gmaps_places_src_to_ods = TriggerDagRunOperator(
task_id="trigger_d_gmaps_places_src_to_ods",
trigger_dag_id="d_gmaps_places_src_to_ods",
)

trigger_d_gmaps_reviews_src_to_ods = TriggerDagRunOperator(
task_id="trigger_d_gmaps_reviews_src_to_ods",
trigger_dag_id="d_gmaps_reviews_src_to_ods",
)

batches = get_attraction_list()
run_gmaps_crawler.expand(batch=batches)
crawl_tasks = e_gmaps_crawler.expand(batch=batches)
crawl_tasks >> trigger_d_gmaps_places_src_to_ods
crawl_tasks >> trigger_d_gmaps_reviews_src_to_ods


d_gmaps_crawler_to_src()

0 comments on commit 12fb45b

Please sign in to comment.