The configurations provided in this project are for demonstration purposes only. They should not be used in a production environment. I do not take responsibility for any failures, issues, or consequences that may arise from using these settings in a production setup.
- Project Overview
- Set Up the Project
- Troubleshooting
- Airflow Data-Aware DAGs
- Connections
- Project Directory Structure
- Contribution
- Data Quality: Implement automated data quality checks and validation.
- Version Control: Manage data versions with Nessie and ensure reproducibility.
- Data Management: Use MinIO for storage and mc for management.
- Data Transformation: Process data with Spark and manage tables with Iceberg.
- Data Validation: Monitor data quality with Soda.
- Workflow Orchestration: Orchestrate workflows with Airflow.
- Transformation Management: Transform and model data with dbt.
This project focuses on data quality management within a modern data ecosystem by integrating technologies such as Nessie Catalog
, MinIO
, SSH
, dbt-core[PyHive]
, Soda-core-spark-df
, Apache Airflow
(with data-aware DAGs), Apache Spark
, and Iceberg
.
The data lakehouse architecture consists of three main layers:
- Raw (MinIO bucket): Stores incoming raw CSV data.
- Bronze: Holds raw data loaded into Iceberg tables.
- Silver: Contains cleaned, analysis-ready datasets for machine learning and reporting.
- Gold: Houses the dimensional representations of the cleaned data, typically for analytical and business intelligence purposes.
The ELT process is broken down into smaller, more manageable units called DAGs. Each DAG handles a specific part of the overall workflow, such as data extraction, transformation, or loading. This modular approach allows for greater flexibility, letting you develop, monitor, and troubleshoot individual stages of the pipeline separately, without affecting the whole process.
Airflow’s datasets feature helps orchestrate these DAGs based on the data flow between them. Instead of setting up hard-coded task dependencies, datasets allow Airflow to trigger tasks dynamically when certain datasets are updated. For example, when a new file is ingested or a table is updated, Airflow detects this change and can automatically trigger the appropriate downstream DAGs. This makes the pipeline more scalable and adaptive, especially as the data ecosystem grows and new dependencies or datasets are added.
For each new file that gets ingested, Airflow automatically creates a new branch in the Nessie catalog. This allows you to isolate the processing of that specific file without affecting the main dataset. Think of it as a sandbox where the file is validated, cleaned, and transformed without any risk to the core data.
If the ELT process runs successfully—meaning the file is fully processed and passes all validations—the changes made on the new branch are merged back into the main branch. This ensures only clean, verified data makes it into the central dataset. However, if the process encounters any issues (e.g., data quality checks fail or transformations break), the branch isn’t merged, and the CSV is rejected. This prevents bad data from corrupting the main dataset, maintaining data integrity while keeping each step traceable.
Nessie doesn’t replicate data in the data lake when creating branches—it simply manipulates metadata. More about Nessie.
In this project, Soda Core and dbt Core serve complementary roles in validating the quality of the data before it reaches the gold layer (the warehouse). Here's how they compare:
Soda Core is used to ensure that the ingested and cleaned data meets business and technical requirements. It focuses on monitoring data health by running quality checks such as data completeness, freshness, and consistency. These tests ensure that the data is ready for downstream processes and qualified to be loaded into the gold layer of the warehouse.
On the other hand, dbt-core tests are tightly integrated into the transformation layer. They validate the integrity and structure of the data models created during the transformation process. Common dbt tests include ensuring unique values, checking for null constraints, and validating referential integrity in the context of building dimensional models and fact tables.
- Linux: Debian-based distribution.
- Docker: Ensure Docker is installed and running.
- Python: Version 3.8 or higher.
The run.sh
script initializes Docker Compose, copies raw data to MinIO, sets up Airflow and connections, and starts the Spark Thrift server.
#Work Directory: Data-Quality-with-Nessie
echo -e "AIRFLOW_UID=$(id -u)" > .env
. ./shell-scripts/run.sh
Reason 1 - MinIO IP
- Inspect the MinIO IP in the
BigData
network and verify it in./conf/minio.env
.
Reason 2 - Network Issues
- Ensure MinIO, Airflow, and Spark are on the same network.
Reason 3 - Spark SSH Issues
- Verify Airflow SSH connection configurations and test them.
- Ensure the
extra
field is correct:
{
"AWS_ACCESS_KEY_ID": "admin",
"AWS_SECRET_ACCESS_KEY": "password",
"AWS_REGION": "us-east-1",
"AWS_DEFAULT_REGION": "us-east-1"
}
Reason 4 - AWS Issues
- Check Airflow SSH connection configurations and test them.
- Ensure the
extra
field is correct:
{
"aws_access_key_id": "admin",
"aws_secret_access_key": "password",
"endpoint_url": "http://minio:9000"
}
// NOTE: CONNECTION TEST MAY SHOW ERROR; IGNORE IT AND RE-TRIGGER THE INGESTION DAG
Reason 5 - Spark Initialization Job Error
- Check logs at
bash-logs/0-init_job.log
for errors. If present, re-run the initialization job:
docker exec spark spark-submit /spark-container/spark/jobs/init_project.py > bash-logs/0-init_job.log
If dbt transformations fail, manually launch the Thrift server:
docker exec spark sh spark-container/ThriftServer-Iceberg-Nessie.sh
If dbt couldn't access the database via thrift connection, check thrift_sasl
python library.
# check if exists
pip show thrift_sasl
# install it if doesn't exists
pip install thrift_sasl
- This DAG ingests Amazon order data from a CSV stored in an S3-compatible MinIO bucket into a Spark-based data processing pipeline.
- The pipeline validates the data with Soda checks and updates ingestion metadata on success or failure.
- It leverages version control via Nessie to create a new branch for each ingestion batch.
Due to the reliance on environment variables for data sharing between DAGs, it is not feasible to run the workflow concurrently. This limitation arises from potential conflicts when sharing Nessie branch names and other environment-specific variables across multiple parallel runs.
As a temporary solution, both this ingestion DAG and the related transformation DAGs are configured to run in a pool with only one slot. This setup ensures that only one instance of the DAG can run at any given time, preventing race conditions or data inconsistencies caused by parallel execution.
- This DAG cleans and audits ingested Amazon order data and loads it into the silver layer of a data lake using Spark.
- It triggers based on the successful ingestion of data and performs data validation using Soda checks, followed by updating relevant datasets based on success or failure.
A code block demonistrates data cleansing in this project from ./spark-container/spark/jobs/cleansing.py
clean_df = df.withColumn('Order_Date', to_date(df.Order_Date, format='MM-dd-yy')) \
.dropna(subset=['Size', 'Qty', 'Amount', 'Order_ID', 'Order_Date', 'Currency']) \
.filter( (col('Qty') > 0) & (col('Amount') > 0) ) \
.fillna({
"Order_Status" : "INVALID_VALUE",
"Fulfilment" : "INVALID_VALUE",
"ORDERS_Channel" : "INVALID_VALUE",
"ship_service_level" : "INVALID_VALUE",
"Category" : "INVALID_VALUE",
"Courier_Status" : "INVALID_VALUE",
"Ship_City" : "INVALID_VALUE",
"Ship_State" : "INVALID_VALUE",
"Ship_Postal_Code" : "INVALID_VALUE",
"Ship_Country" : "INVALID_VALUE",
"Fulfilled_By" : "INVALID_VALUE",
"New" : "INVALID_VALUE",
"PendingS" : "INVALID_VALUE",
}) \
.drop_duplicates(subset=['Order_ID', 'Category', 'Order_Date', 'Amount'])
# before loading, we need to check spelling for some columns
# picking up columns that has determined values
# such as: size, status, category, etc
# this stage should discover if there any typos in these columns
target_col_names = [
"Order_Status",
"Fulfilment",
"ORDERS_Channel",
"ship_service_level",
"Category",
"Size",
"Courier_Status",
"Currency",
"Ship_City",
"Ship_State",
"Ship_Country",
"Fulfilled_By",
"New",
]
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import StringType
import pandas as pd # type: ignore
from autocorrect import Speller # type: ignore
spell = Speller()
@pandas_udf(StringType())
def correct_string_spelling_udf(string: pd.Series) -> pd.Series:
return string.apply(lambda x: str(spell(x)) if x else x)
@pandas_udf(StringType())
def standarize_string_udf(sentence: pd.Series) -> pd.Series:
return sentence.apply(lambda x: x.capitalize().strip() if x else x)
# Apply the UDFs to the DataFrame columns
for column in target_col_names:
clean_df = clean_df \
.withColumn(column, standarize_string_udf(col(column))) \
.withColumn(column, correct_string_spelling_udf(col(column)))
- This DAG is responsible for transforming Amazon order data from the silver layer to the gold layer using dbt (Data Build Tool).
- It processes source, dimension, and fact models in parallel, with tests after each transformation.
- The workflow updates success or failure datasets depending on the outcome of the dbt transformations.
├── dims
│ ├── currency_dim.sql
│ ├── date_dim.sql
│ ├── location_dim.sql
│ ├── product_dim.sql
│ └── shipping_dim.sql
├── facts
│ └── fact_amazon_orders.sql
└── sources
├── amazon_orders_silver.sql
└── src_amazon_orders_silver.yml
- This DAG is responsible for publishing transformed datasets by merging them into the main table using Spark,
- then moving processed CSV files from a "queued" to a "processed" S3 bucket.
- Finally, the DAG updates success or failure datasets based on the outcome.
- This DAG moves rejected CSV files to a rejected_csv folder in an S3 bucket whenever a failure occurs during data ingestion, cleaning, transformation, or publishing.
- It updates relevant success or failure datasets based on the result.
The diagram above illustrates how the various components of the data pipeline interact in this project, showing a Git-like branching structure for data versions using Nessie:
- White Lines: AWS SDK connection to access the raw and warehouse buckets.
- Blue Lines: Airflow SSH Plugin connection to Spark, managing jobs and interactions through SSH for orchestration.
- Green Lines: Nessie branches created during the ELT process, managing metadata in the warehouse, and retriving data lakehouse iceberg tables.
- Red Lines: Thrift Server connections to dbt-core using PyHive, enabling Spark SQL transformations and queries.
- Purple Lines: Airflow handles the scheduling and coordination of all tasks, ensuring seamless integration between these tools.
.
├── airflow
│ ├── airflow-compose.yaml
│ ├── config
│ ├── dags
│ │ ├── 00-ingestion_layer
│ │ │ └── amazon_csv_orders.py
│ │ ├── 01-cleansing_layer
│ │ │ └── amazon_csv_orders.py
│ │ ├── 02-transformation_layer
│ │ │ └── amazon_csv_orders.py
│ │ ├── 03-publish
│ │ │ └── amazon_csv_orders.py
│ │ ├── 10-error_handelings
│ │ │ └── amazon_csv_orders.py
│ │ ├── 11-triggers
│ │ │ └── amazon_csv_orders_ingestion_trigger.py
│ │ └── dynamic.py
│ ├── db
│ │ └── airflow.db
│ ├── dockerfile
│ ├── includes
│ │ ├── data
│ │ │ └── datasets.py
│ │ ├── dbt_projects
│ │ │ └── amazon_orders
│ │ │ ├── analyses
│ │ │ ├── dbt_project.yml
│ │ │ ├── macros
│ │ │ │ └── dim_loaders
│ │ │ │ ├── date_scd_0.sql
│ │ │ │ └── scd_0.sql
│ │ │ ├── models
│ │ │ │ ├── dims
│ │ │ │ │ ├── currency_dim.sql
│ │ │ │ │ ├── date_dim.sql
│ │ │ │ │ ├── location_dim.sql
│ │ │ │ │ ├── product_dim.sql
│ │ │ │ │ └── shipping_dim.sql
│ │ │ │ ├── facts
│ │ │ │ │ └── fact_amazon_orders.sql
│ │ │ │ └── sources
│ │ │ │ ├── amazon_orders_silver.sql
│ │ │ │ └── src_amazon_orders_silver.yml
│ │ │ ├── README.md
│ │ │ ├── seeds
│ │ │ ├── snapshots
│ │ │ └── tests
│ │ │ ├── dim_tests.yml
│ │ │ └── fact_tests.yml
│ │ └── pools
│ │ └── pools.py
│ ├── plugins
│ │ └── operators
│ │ ├── sparkSSH.md
│ │ └── sparkSSH.py
│ ├── profiles.yml
│ └── requirements.txt
├── config
│ ├── dremio.env
│ ├── dwh.env
│ ├── env_loader.py
│ ├── minio.env
│ ├── nessie.env
│ └── paths.env
├── data
│ └── original_dataset
│ ├── Amazon Sale Report.csv
│ ├── batchs
│ │ ├── sampled_data_1.csv
│ │ ├── sampled_data_2.csv
│ │ ├── sampled_data_3.csv
│ │ └── sampled_data_4.csv
│ ├── README.md
│ └── sampler.py
├── docker-compose.yaml
├── dockerfiles
│ └── spark
│ ├── dockerfile
│ ├── entrypoint.sh
│ └── requirements.txt
├── LICENSE
├── md_assets
│ ├── airflow
│ │ ├── dags
│ │ │ ├── cleaning.png
│ │ │ ├── error-handling.png
│ │ │ ├── publish.png
│ │ │ ├── re-ingest.png
│ │ │ ├── start_ingestion.png
│ │ │ └── transform.png
│ │ └── workflow
│ │ └── dep-graph.png
│ ├── connections.png
│ └── project_overview.png
├── README.md
├── requirements.txt
├── shell-scripts
│ ├── run.sh
│ └── setup
│ ├── airflow-setup.sh
│ └── minio-setup.sh
└── spark-container
├── jinja-templates
│ └── amazon_orders
│ └── lakehouse-init.sql
├── modules
│ └── SparkIcebergNessieMinIO
│ ├── CustomSparkConfig.py
│ └── spark_setup.py
├── soda
│ ├── checks
│ │ ├── bronz_amazon_orders.py
│ │ └── silver_amazon_orders.py
│ ├── modules
│ │ └── soda
│ │ └── helper.py
│ └── tables
│ ├── bronze_amazon_orders.yaml
│ └── silver_amazon_orders.yaml
├── spark
│ └── jobs
│ ├── cleansing.py
│ ├── ingest.py
│ ├── init_project.py
│ ├── load.py
│ └── merge_into_main.py
└── ThriftServer-Iceberg-Nessie.sh
51 directories, 76 files
Contributions to this project are highly welcomed and greatly appreciated. If you would like to contribute, please adhere to the following guidelines:
- Fork the Repository: Start by forking the repository to create your own copy where you can make changes without affecting the original project.
- Create a Branch: Before making changes, create a new branch with a descriptive name related to the feature or fix you're working on.
- Make Your Changes: Implement your changes or additions, ensuring to follow the existing code style and standards. If you’re adding a new feature, please include corresponding tests and documentation.
- Submit a Pull Request: Once your changes are ready, submit a pull request detailing what you have done. Be sure to explain the purpose of your changes and how they improve the project.
- Review Process: Your pull request will be reviewed by project maintainers. Feedback may be provided, and you may need to make additional changes before your pull request is merged.
We value the contributions and strive to incorporate valuable enhancements and fixes. For any significant changes, please open an issue first to discuss your ideas and get feedback from the community before proceeding. Thank you for your interest and support in improving this!