Skip to content

Commit

Permalink
Merge pull request #4 from ahmadMuhammadGd/fix/parallel-exec
Browse files Browse the repository at this point in the history
Fix/parallel exec
  • Loading branch information
ahmadMuhammadGd authored Oct 17, 2024
2 parents 7db1017 + 4160e2d commit afe2736
Show file tree
Hide file tree
Showing 41 changed files with 7,784 additions and 4,054 deletions.
Empty file modified LICENSE
100644 → 100755
Empty file.
7 changes: 4 additions & 3 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ On the other hand, **dbt-core** tests are tightly integrated into the transforma

The `run.sh` script initializes Docker Compose, copies raw data to MinIO, sets up Airflow and connections, and starts the Spark Thrift server.
```bash
cd /path/to/Data-Quality-with-Nessie
sh ./shell-scripts/run.sh
#Work Directory: Data-Quality-with-Nessie
echo -e "AIRFLOW_UID=$(id -u)" > .env
. ./shell-scripts/run.sh
```


Expand Down Expand Up @@ -411,4 +412,4 @@ Contributions to this project are highly welcomed and greatly appreciated. If yo
4. **Submit a Pull Request:** Once your changes are ready, submit a pull request detailing what you have done. Be sure to explain the purpose of your changes and how they improve the project.
5. **Review Process:** Your pull request will be reviewed by project maintainers. Feedback may be provided, and you may need to make additional changes before your pull request is merged.

We value the contributions and strive to incorporate valuable enhancements and fixes. For any significant changes, please open an issue first to discuss your ideas and get feedback from the community before proceeding. Thank you for your interest and support in improving this!
We value the contributions and strive to incorporate valuable enhancements and fixes. For any significant changes, please open an issue first to discuss your ideas and get feedback from the community before proceeding. Thank you for your interest and support in improving this!
61 changes: 53 additions & 8 deletions airflow/airflow-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ x-airflow-common:
- ../config/minio.env
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: SequentialExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/db/airflow.db
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__CORE__TEST_CONNECTION: 'Enabled'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
Expand All @@ -83,19 +84,39 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/db:/opt/airflow/db
- ${AIRFLOW_PROJ_DIR:-.}/includes:/opt/airflow/includes
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy

networks:
BigData:
name: BigData-network
driver: bridge

services:
airflow-node:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- BigData

airflow-webserver:
<<: *airflow-common
command: bash -c "airflow scheduler & airflow webserver"
container_name: airflow-node
command: webserver
ports:
- "8082:8080"
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
Expand All @@ -104,7 +125,25 @@ services:
start_period: 30s
restart: always
depends_on:
# <<: *airflow-common-depends-on
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- BigData

airflow-scheduler:
<<: *airflow-common
command: scheduler
container_name: airflow-scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
Expand Down Expand Up @@ -136,6 +175,8 @@ services:
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/db:/opt/airflow/db
- ${AIRFLOW_PROJ_DIR:-.}/includes:/opt/airflow/includes
networks:
- BigData

airflow-cli:
<<: *airflow-common
Expand All @@ -150,4 +191,8 @@ services:
- bash
- airflow
networks:
- BigData
- BigData


volumes:
postgres-db-volume:
133 changes: 34 additions & 99 deletions airflow/dags/00-ingestion_layer/amazon_csv_orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
from includes.data.datasets import FAIL_INGESTION_DATASET, SUCCESS_INGESTION_DATASET, INFO_INGESTION_DATASET
from includes.data.datasets import INFO_FOUND_CSV, FAIL_INGESTION_DATASET, SUCCESS_INGESTION_DATASET, INFO_INGESTION_DATASET
from includes.data.utils import get_extra_triggering_run, update_outlet
from airflow.datasets.metadata import Metadata # type: ignore
from includes.pools.pools import CSV_PIPLEINE_POOL
from airflow.models import Variable
import logging, os

Expand All @@ -17,7 +17,6 @@
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'pool': 'amazon_csv_pipeline_1_slot_pool'
}

doc_md_DAG="""
Expand All @@ -28,130 +27,66 @@
"""

@dag(
dag_id="start_amazon_csv_orders",
dag_id="ingest_orders",
catchup=False,
tags=["Spark", "SSH", "Iceberg", "bronz", "Soda", "Nessie", "ingestion", "CSV", "Amazon"],
schedule=timedelta(days=1),
schedule=[INFO_FOUND_CSV],
default_args=default_args,
doc_md=doc_md_DAG
)
def ingest():
@task.branch(task_id="start")
def pick_s3_object(**kwargs):
"""
Fetches the list of CSV files from the S3 bucket (landing prefix).
If no files are found, it logs an informational message and updates the info dataset.
If a file is found, it selects the first/last CSV and pushes the file path, timestamp, and recommended Nessie branch name to XCom.
"""
s3_hook = S3Hook(
aws_conn_id='minio_connection',
)

bucket_name = os.getenv("QUEUED_BUCKET")
objects = s3_hook.list_keys(bucket_name=bucket_name, prefix='landing')
objects = [obj for obj in objects if obj.endswith('csv')]
@task(task_id = 'retrieve_extra')
def retrieve_extra(**context)-> dict:
return get_extra_triggering_run(context)[0]

if not objects:
logging.info("No objects found in the bucket")
return 'update_info_dataset'
else:
# Pick the first object (FIFO)
object_name = objects[0]

# object_name = objects[-1] # UNCOMMENT to pick the last object (LIFO)

logging.info(f"Object found: {object_name}")

nessie_branch_prefex = os.getenv("BRANCH_AMAZON_ORDERS_PIPELINE_PREFEX")

# Push values to XCom for further tasks
current_csv_name = object_name.split('/')[-1]
current_csv_path = object_name

timestamp = datetime.now()
curent_timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%S') # for spark SLQ
branch_recommended_name = f'{nessie_branch_prefex}_{(current_csv_name).replace('.csv', '')}_{str(timestamp.strftime('%Y%m%d%H%M%S'))}'

kwargs['ti'].xcom_push(key='current_csv' , value=current_csv_path)
kwargs['ti'].xcom_push(key='timestamp' , value=curent_timestamp)
kwargs['ti'].xcom_push(key='nessie_branch' , value=branch_recommended_name)

Variable.set("nessie_branch", branch_recommended_name)
Variable.set("curent_csv", current_csv_path)

return 'defining_new_branch'

# 0.1 STOP !!!
##############################################################################
##############################################################################
##############################################################################

# 0.2 preparing dag branch
@task(task_id = 'defining_new_branch')
def define_branch(**kwargs)->None:
'''
Defines a new branch for Nessie in the Spark environment by updating the BRANCH_AMAZON_ORDERS_PIPELINE variable in the Nessie configuration (nessie.env).
This is done via an SSH connection to the Spark cluster.
'''
nessie_branch_new = kwargs['ti'].xcom_pull(key='nessie_branch')

ssh_hook = SSHHook(ssh_conn_id='sparkSSH',)
with ssh_hook.get_conn() as client:
'''
This command assumes that 'BRANCH_AMAZON_ORDERS_PIPELINE_PREFEX' exists in spark's environment variables
'''
client.exec_command(
f'''
sed -i "s|^BRANCH_AMAZON_ORDERS_PIPELINE\s*=.*|BRANCH_AMAZON_ORDERS_PIPELINE={nessie_branch_new}|" /config/nessie.env
'''
)

# 1. Ingest csv
extra = retrieve_extra()
extra_python_args = "{{ ti.xcom_pull(task_ids='retrieve_extra')['spark_jobs_script_args'] }}"

ingest_batch = SSHSparkOperator(
task_id ='csv_ingestion',
ssh_conn_id ='sparkSSH',
application_path ='/spark-container/spark/jobs/ingest.py',
python_args =
f"--object-path {os.getenv('QUEUED_BUCKET')}/{{{{ ti.xcom_pull(task_ids='start', key='current_csv') }}}} -t {{{{ ti.xcom_pull(task_ids='start', key='timestamp') }}}}",
doc_md =
python_args = extra_python_args,
doc_md=
"""
Executes the ingestion of the selected CSV file into the bronze layer using Spark.
It runs a Python script (ingest.py) on the Spark cluster via SSH, with arguments like the object path and timestamp retrieved from XCom.
"""
)

# 1.2 Validate ingested data with soda

validate_batch = SSHSparkOperator(
task_id ='Audit_bronze_batch',
ssh_conn_id ='sparkSSH',
application_path ="/spark-container/soda/checks/bronz_amazon_orders.py",
python_args ="-t {{ ti.xcom_pull(task_ids='start', key='timestamp') }}",
python_args = extra_python_args,
doc_md =
"""
Audits the ingested data in the bronze layer using Soda checks.
This validation task runs another Spark job via SSH (bronz_amazon_orders.py) to ensure data quality.
Audits the ingested data in the bronze layer using Soda checks.
This validation task runs another Spark job via SSH (bronz_amazon_orders.py) to ensure data quality.
"""
)


@task(task_id = 'update_fail_dataset', outlets=[FAIL_INGESTION_DATASET], trigger_rule="one_failed")
def update_success(extra, **context):
update_outlet(
FAIL_INGESTION_DATASET,
content=extra,
context=context
)

# 1.3.1 do something on validation failure
@task(task_id='update_fail_dataset', outlets=[FAIL_INGESTION_DATASET], trigger_rule="all_failed")
def update_fail_dataset():
Metadata(FAIL_INGESTION_DATASET, {"failed at": {datetime.now()}})

# 1.3.2 update dataset
@task(task_id='update_success_dataset', outlets=[SUCCESS_INGESTION_DATASET])
def update_success_dataset():
Metadata(SUCCESS_INGESTION_DATASET, {"succeded at": {datetime.now()}})

@task(task_id='update_info_dataset', outlets=[INFO_INGESTION_DATASET])
def update_info_dataset():
Metadata(INFO_INGESTION_DATASET, {"info: No files found": {datetime.now()}})

@task(task_id = 'update_success_dataset', outlets=[SUCCESS_INGESTION_DATASET])
def update_failed(extra, **context):
update_outlet(
SUCCESS_INGESTION_DATASET,
content=extra,
context=context
)


define_branch = define_branch()
pick_s3_object() >> [define_branch, update_info_dataset()]
define_branch >> ingest_batch >> validate_batch
validate_batch >> [update_success_dataset(), update_fail_dataset()]
extra >> ingest_batch >> validate_batch
validate_batch >> [update_success(extra), update_failed(extra)]

ingest()
51 changes: 35 additions & 16 deletions airflow/dags/01-cleansing_layer/amazon_csv_orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from includes.data.datasets import SUCCESS_INGESTION_DATASET, SUCCESS_CLEANING_DATASET, FAIL_CLEANING_DATASET
from airflow.datasets.metadata import Metadata # type: ignore
from includes.data.utils import get_extra_triggering_run, update_outlet


default_args = {
Expand All @@ -28,30 +29,48 @@
"""
)
def cleansing_and_loading():

@task(task_id = 'retrieve_extra')
def retrieve_extra(**context):
return get_extra_triggering_run(context)[0]

extra = retrieve_extra()
extra_python_args = "{{ ti.xcom_pull(task_ids='retrieve_extra')['spark_jobs_script_args'] }}"

clean_batch = SSHSparkOperator(
task_id ='clean_and_load_to_silver',
ssh_conn_id ='sparkSSH',
application_path ='/spark-container/spark/jobs/cleansing.py'
application_path ='/spark-container/spark/jobs/cleansing.py',
python_args = extra_python_args
)

# 2.2 Validate data with soda

validate_cleaned = SSHSparkOperator(
task_id ='Audit_cleaned_batch',
ssh_conn_id ='sparkSSH',
application_path ='/spark-container/soda/checks/silver_amazon_orders.py',
task_id ='Audit_cleaned_batch',
ssh_conn_id ='sparkSSH',
application_path ='/spark-container/soda/checks/silver_amazon_orders.py',
python_args = extra_python_args

)

@task(task_id = 'update_fail_dataset', outlets=[FAIL_CLEANING_DATASET], trigger_rule="one_failed")
def update_success(extra, **context):
update_outlet(
FAIL_CLEANING_DATASET,
content=extra,
context=context
)

@task(task_id = 'update_success_dataset', outlets=[SUCCESS_CLEANING_DATASET])
def update_failed(extra, **context):
update_outlet(
SUCCESS_CLEANING_DATASET,
content=extra,
context=context
)

# 2.3 Do something on error
@task(task_id='update_fail_dataset', outlets=[FAIL_CLEANING_DATASET], trigger_rule="all_failed")
def update_fail_dataset():
Metadata(FAIL_CLEANING_DATASET, {"failed at": {datetime.now()}})

# 1.3.2 update dataset
@task(task_id='update_success_dataset', outlets=[SUCCESS_CLEANING_DATASET])
def update_success_dataset():
Metadata(SUCCESS_CLEANING_DATASET, {"succeded at": {datetime.now()}})

clean_batch >> validate_cleaned
validate_cleaned >> [update_fail_dataset(), update_success_dataset()]
extra >> clean_batch >> validate_cleaned
validate_cleaned >> [update_success(extra), update_failed(extra)]

cleansing_and_loading()
Loading

0 comments on commit afe2736

Please sign in to comment.