Skip to content

Commit

Permalink
make python env a parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
snopoke committed Sep 19, 2019
1 parent ba82960 commit ed71eba
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 15 deletions.
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ Symlink dags to expected location
ln -s dags ~/airflow/dags
```

Symlink your hq virtualenv into your hq repo in a directory named `python_env` (same location as on production servers)

```
ln -s /path/to/virtualenv /path/to/cchq/python_env
```

create airflow dbs

```
Expand All @@ -35,7 +29,8 @@ airflow initdb
set airflow environment variables

```
airflow variables --set CCHQ_HOME /path/to/cchq
airflow variables --set CCHQ_HOME <path to cchq>
airflow variables --set CCHQ_PY_ENV <path to cchq python env>
```

# How to run
Expand Down
6 changes: 3 additions & 3 deletions dags/dashboard_subdags.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from airflow.operators.subdag_operator import SubDagOperator


run_query_template = """cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py run_aggregation_query {{ params.query }} {{ ti.xcom_pull('get_uuid') }}"""
run_query_template = """cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py run_aggregation_query {{ params.query }} {{ ti.xcom_pull('get_uuid') }}"""


def parallel_subdag(parent_dag, child_dag, default_args, schedule_interval, tasks):

subdag_query_template = """cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py run_aggregation_query {{ params.query }} {{ ti.xcom_pull(dag_id=params.parent_dag_id, task_ids='get_uuid') }}"""
subdag_query_template = """cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py run_aggregation_query {{ params.query }} {{ ti.xcom_pull(dag_id=params.parent_dag_id, task_ids='get_uuid') }}"""

parallel_dag = DAG(
'{}.{}'.format(parent_dag, child_dag),
Expand Down Expand Up @@ -50,7 +50,7 @@ def generate_uuid():

create_aggregation_record = BashOperator(
task_id='create_aggregation_record',
bash_command="""cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_aggregation_record {{ params.query }} {{ ti.xcom_pull('get_uuid') }} {{ tomorrow_ds }} {{ params.interval }}""",
bash_command="""cd {{ var.value.CCHQ_HOME }}; {{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_aggregation_record {{ params.query }} {{ ti.xcom_pull('get_uuid') }} {{ tomorrow_ds }} {{ params.interval }}""",
params={'interval': interval},
dag=monthly_dag
)
Expand Down
10 changes: 5 additions & 5 deletions dags/dim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from airflow.operators import BashOperator


commit_table_template = """{{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py commit_table {{ params.table_slug }} {{ ti.xcom_pull(params.start_id) }}"""
commit_table_template = """{{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py commit_table {{ params.table_slug }} {{ ti.xcom_pull(params.start_id) }}"""


def linear_subdag(parent_dag, child_dag, default_args, schedule_interval, final_type):
Expand All @@ -19,7 +19,7 @@ def linear_subdag(parent_dag, child_dag, default_args, schedule_interval, final_

start_batch = BashOperator(
task_id=start_id,
bash_command="{{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_batch {{ params.table_slug }} '{{ next_execution_date.strftime('%Y-%m-%d %H:%M:%S') }}'",
bash_command="{{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_batch {{ params.table_slug }} '{{ next_execution_date.strftime('%Y-%m-%d %H:%M:%S') }}'",
params={'table_slug': final_slug},
dag=dag,
xcom_push=True
Expand All @@ -42,7 +42,7 @@ def linear_subdag(parent_dag, child_dag, default_args, schedule_interval, final_

complete_batch = BashOperator(
task_id='complete_batch',
bash_command="{{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py mark_batch_complete {{ ti.xcom_pull(params.start_id) }}",
bash_command="{{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py mark_batch_complete {{ ti.xcom_pull(params.start_id) }}",
params={'start_id': start_id},
dag=dag
)
Expand Down Expand Up @@ -73,7 +73,7 @@ def multi_subdag(parent_dag, child_dag, default_args, schedule_interval, dim_dep

start_batch = BashOperator(
task_id=start_id,
bash_command="{{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_batch {{ params.batch_slug }} '{{ next_execution_date.strftime('%Y-%m-%d %H:%M:%S') }}'",
bash_command="{{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py create_batch {{ params.batch_slug }} '{{ next_execution_date.strftime('%Y-%m-%d %H:%M:%S') }}'",
params={'batch_slug': batch_slug},
dag=dag,
xcom_push=True
Expand Down Expand Up @@ -132,7 +132,7 @@ def multi_subdag(parent_dag, child_dag, default_args, schedule_interval, dim_dep

complete_batch = BashOperator(
task_id='complete_batch',
bash_command="{{ var.value.CCHQ_HOME }}/python_env-3.6/bin/python {{ var.value.CCHQ_HOME }}/manage.py mark_batch_complete {{ ti.xcom_pull(params.start_id) }}",
bash_command="{{ var.value.CCHQ_PY_ENV }}/bin/python {{ var.value.CCHQ_HOME }}/manage.py mark_batch_complete {{ ti.xcom_pull(params.start_id) }}",
params={'start_id': start_id},
dag=dag
)
Expand Down

0 comments on commit ed71eba

Please sign in to comment.