-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DAG for exporting retool data #550
Conversation
bbcf345
to
c1aa6df
Compare
udpate image udpate image udpate image udpate image
c1aa6df
to
ef7e871
Compare
4f4b420
to
1102681
Compare
64605ef
to
b95a518
Compare
update xcom value update set xcom input simplify rename
b95a518
to
c632660
Compare
4090b59
to
7ee9e0d
Compare
75c4c6e
to
2461095
Compare
f0e9186
to
d09b178
Compare
d09b178
to
1576b35
Compare
update move update update
1576b35
to
5920807
Compare
@@ -315,6 +315,7 @@ | |||
"schema_filepath": "/home/airflow/gcs/dags/schemas/", | |||
"sentry_dsn": "https://[email protected]/6190849", | |||
"sentry_environment": "development", | |||
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary. Once we have build setup for stellar-etl, will switch to stellar workspace.
@@ -0,0 +1,163 @@ | |||
[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took this JSON schema from bigquery after creating the table
dags/external_data_dag.py
Outdated
dag = DAG( | ||
"external_data_dag", | ||
default_args=get_default_dag_args(), | ||
start_date=datetime(2024, 12, 5, 14, 30), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend updating this start date. Generally because this is set to run "0 22 * * *"
you don't want to set the hour::minutes part of the datetime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 9a4e60f
dags/external_data_dag.py
Outdated
) | ||
|
||
|
||
def get_insert_to_bq_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function different than https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_bq_insert_job_task.py or anything else in ./dags/stellar_etl_ariflow/build_*
?
If it's different it might be worthwhile to separate this out into either utils.py
or another *.py file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, it is different since it is combination of delete and insert. Whereas the one in build_bq_insert_job_task
is just insertion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's fine then. Yeah a nit to move to a separate helper file. Also recommend renaming the function to be more descriptive of it deleting and inserting instead of just get_insert_to_bq_task
. Maybe just add delete
to the function name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I moved it to dags/stellar_etl_airflow/build_del_ins_operator.py
in 9a4e60f
Also renamed the method to create_export_del_insert_operator
. Essentially create_export_del_insert_operator
is specific type of create_del_ins_task
image = "{{ var.value.stellar_etl_internal_image_name }}" | ||
|
||
output_filepath = "" | ||
if use_gcs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: technically the gcs
if/case could be split out into a separate function or interface so that it's easy to add support for other cloud storage systems. BUT I don't think we'll ever need support outside of GCS for this so it's fine to leave this as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I think in general we could try to write the operators as classes, which can help with overriding.. Right now, it is more functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly one day we'll probably refactor all of stellar-etl-airflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM left some minor comments
PR Checklist
PR Structure
otherwise).
Thoroughness
What
This PR adds DAG to fetch custom data from retool API. It triggers command in stellar-internal , pull data created/updated within given time range.
Existing batch_id is deleted and then re-created(This is to keep idempotent results while retries)
Why
To support new data ingestion
Known limitations
Not a limitation, but open to renaming the dag if it too generic. Probable options:
Testing
Tested the airflow in test-hubble and records look loaded in BQ table