Skip to content

lucaszanotelli/stellar-etl-airflow

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stellar-etl-airflow

This repository contains the Airflow DAGs for the Stellar ETL project. These DAGs provide a workflow for exporting data from the Stellar network and uploading the data into BigQuery.

Table of Contents


Installation and Setup


Google Cloud Platform

Below are instructions to intialize the Google Cloud SDK and create the GCP project, dataset, and GCS bucket if needed.


Setup the Cloud SDK

Create Google Project

  • Login to the Google Cloud Console
  • Create a new Google Project or use an existing project

    NOTE: The project name you choose corresponds to the Airflow variable "bq_project".

Create BigQuery Dataset

  • Log in to Google BigQuery
  • Create a new dataset with the desired name or use an existing dataset

    NOTE: The dataset name you choose corresponds to the Airflow variable "bq_dataset".

Create Google Cloud Storage bucket

  • Open the Cloud Storage browser

  • Create a new Google Storage bucket that will store exported files

    NOTE: Creating a new Cloud Composer environment will automatically create a new GCS bucket.

    NOTE: The dataset name you choose corresponds to the Airflow variable "gcs_exported_data_bucket_name".

    NOTE: Creating a new environment with Cloud Composer will create a new GCS bucket.

WARNING: Make sure that you adhere to the location requirements for Cloud Storage buckets and BigQuery datasets. Otherwise, it will not be possible to upload data to BigQuery.



Cloud Composer

Cloud Composer is the preferred method of deployment. Cloud Composer is a managed service used for Airflow deployment that provides much of the infrastructure required to host an Airflow instance. The steps for setting up a Cloud Composer environment are detailed below.


Create Google Cloud Composer environment

Create a new Cloud Composer environment using the UI or by following the setup instructions in Create Cloud Composer environments

For AIRFLOW 2.x: Be wary of choosing "autopilot" for environment resource management. The ephemeral storage provided by autopilot-ed containers is capped at 10GB, which may not be enough for hefty tasks (such as state_table_dag's export_task), or any task that runs captive core. You can add a second node pool to your Composer 1 environment, and configure it to be managed by autopilot if desired. Composer 2 environments use autopilot exclusively for resource management.

Note: If no service account is provided, GCP will use the default GKE service account. For quick setup this is an easy option. Remember to adjust the disk size, machine type, and node count to fit your needs. The python version must be 3, and the image must be composer-1.16.11-airflow-1.10.14 or later. GCP deprecates support for older versions of composer and airflow. It is recommended that you select a stable, latest version to avoid an environment upgrade. See the command reference page for a detailed list of parameters.

TROUBLESHOOTING: If the environment creation fails because the "Composer Backend timed out" try disabling and enabling the Cloud Composer API. If the creation fails again, try creating a service account with Owner permissions and use it to create the Composer environment.

Cloud Composer may take a while to setup the environment. Once the process is finished, you can view the environment by going to the Composer section of the Cloud Console.

NOTE: Creating an environment will also create a new Google Cloud Storage bucket. You can check this bucket's name by clicking on the DAGs folder link in the Composer section of the Cloud Console.


Upload DAGs and Schemas to Cloud Composer

After the environment is created, select the environment and navigate to the environment configuration tab. Look for the value under DAGs folder. It will be of the form gs://airflow_bucket/dags. The airflow_bucket value will be used in this step and the next. Run the command below in order to upload the DAGs and schemas to your Airflow bucket.

> bash upload_static_to_gcs.sh <airflow_bucket>

Afterwards, you can navigate to the Airflow UI for your Cloud Composer environment. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow webserver. Then, pause the DAGs by clicking the on/off toggle to the left of their names. DAGs should remain paused until you have finished setting up the environment. Some DAGs may not show up due to errors that will be fixed as the following steps are completed.


Add Service Account Key

The Airflow DAGs require service account keys to perform their operations. Generate a service account key for a service account that has access to BigQuery and Google Cloud Storage. Then, add this key file to the data folder in your airflow_bucket.

NOTE: The name of the key file corresponds to the Airflow variable "api_key_path". The data folder in Cloud Storage corresponds to the path "/home/airflow/gcs/data/", but ensure that the variable has the correct filename.


Add Kubernetes Node Pool

If the Kubernetes pods contain long-running or resource intensive operations, it is best to create a separate node pool for task execution. Executing the tasks on the same node pool as the airflow-scheduler will contribute to resource starvation and transient failures in the DAG.

Find the Kubernetes cluster name that is used by your Cloud Composer environment. To do so, select the environment, navigate to the ENVIRONMENT CONFIGURATION tab, and look for the value of GKE cluster. The cluster name is the final part of this path.

Then, run the command:

gcloud container node-pools create <pool_name> --cluster <cluster_name> \
--zone <composer_zone> --project <project_id>

Alternatively, node pools can be created through the UI with the ADD NODE POOL button. Select the environment, navigate to the ENVIRONMENT CONFIGURATION tab, and look for the value under GKE cluster

Security can only be applied upon pool creation, so ensure that your security account and scopes are correct. If they need to be updated, you will need to delete the node pool and recreate it.

NOTE: The name of the pool will be used in the Airflow variable "affinity".

A sample affinity configuration is below, as well as defined in the airflow_variables.txt. The user must supply the node pool name in values.

"affinity": {
        "nodeAffinity": {
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [{
                    "matchExpressions": [{
                        "key": "cloud.google.com/gke-nodepool",
                        "operator": "In",
                        "values": [<node-pool-1>,
						           <node-pool-2>,]
                        }]
                    }]
                }
            }
        },

Create Namespace for ETL Tasks (Optional)

Open the Google Cloud Shell. Run these commands:

gcloud container clusters get-credentials <cluster_name> --region=<composer_region>

kubectl create ns <namespace_name>

kubectl create clusterrolebinding default-admin --clusterrole cluster-admin \
--serviceaccount=<service_account> --namespace <namespace_name>

The first command acquires credentials, allowing you to execute the next commands. The second command creates the new namespace, and the third allows the service account that executes tasks to act in the new namespace.

To find the value of <airflow_worker_namespace>, select your Cloud Composer environment, navigate to the ENVIRONMENT CONFIGURATION tab, and look for the value of GKE cluster. Click on the link that says view cluster workloads.

A new page will open with a list of Kubernetes workflows. Click on airflow-worker in order to go to the details page for that Deployment. Look for the value of Namespace.

NOTE: The name of the newly created namespace corresponds to the Airflow variable "namespace".


Authenticating Tasks in an Autopilot-Managed Environment

There are a few extra hoops to jump through to configure Workload Identity, so that export tasks have permissions to upload files to GCS. You will be creating a Kubernetes service account, and bind it to a Google service account that your task is authenticated as. Steps taken from this doc.

  • Create a namespace in the k8s cluster where the Composer env is running:

    kubectl create namespace <namespace_name>
  • Create a k8s service account:

    kubectl create serviceaccount <service_account_name> \
        --namespace <namespace_name>
  • Create a Google service account, if one doesn't already exist:

    gcloud iam service-accounts create <service_account_name> \
        --project=<project_id>
  • Grant the Google service account that you're using storage.objectAdmin permissions, it doesn't already have it.

    gcloud projects add-iam-policy-binding hubble-261722 \
      --member "<Google service account>" \
      --role "roles/storage.objectAdmin"
  • Associate the Google and k8s service accounts:

    gcloud iam service-accounts add-iam-policy-binding <Google service account email> \
      --role roles/iam.workloadIdentityUser \
      --member "<k8s service account>"
  • Annotate the k8s service account with the Google service account:

    kubectl annotate serviceaccount <k8s service account> \
        --namespace <namespace_name> \
        iam.gke.io/gcp-service-account=<Google service account>
  • Set the corresponding airflow variables (k8s_namespace and k8s_service_account) for tasks running on KubernetesPodOperator.


Modify Kubernetes Config for Airflow Workers

Find the Kubernetes cluster workloads that are used by your Cloud Composer environment. To do so, select the environment, navigate to the ENVIRONMENT CONFIGURATION tab, and look for the GKE cluster section. Click on the link that says view cluster workloads.

A new page will open with a list of Kubernetes workflows. Click on airflow-worker in order to go to the details page for that Deployment. Click the edit button. This will take you to a tab with a Kubernetes configuration. In subsequent steps, you will edit this file. For an example of a finalized config file, see this example file.

WARNING: You shouldn't copy the example file directly because it has environment variables and config values that are set up for a different project.

NOTE: This deployment file contains two separate containers: airflow-worker and gcs-syncd. Only the airflow-worker container should be edited.

Mount Docker on Airflow Workers In this step, mount the Docker.sock and Docker. In addition, edit the security config so that the container runs as privileged, allowing it to access Docker. See [this commit](https://github.com/marc-chan/cloud_composer_examples/commit/f3e6a202ef0bfd2214385def7e36be33db191df6#diff-fc2e428a07c8d60059e54e5154f0c540) for an example of how to make these changes.
Add Volume for Local Files to Airflow Workers In this step, add another volumeMount to airflow-workers. This local path will be used for temporary storage of exported files. In addition, make sure that you add the corresponding volume with the type DirectoryOrCreate.

Here is an example of what your volumeMounts and volumes should look like at the end of this step:

...

volumeMounts:
- mountPath: /etc/airflow/airflow_cfg
name: airflow-config

- mountPath: /home/airflow/gcs
name: gcsdir

- mountPath: /var/run/docker.sock
name: docker-host

- mountPath: /bin/docker
name: docker-app

- mountPath: /home/airflow/etlData
name: etl-data
...

volumes:
- configMap:
defaultMode: 420
name: airflow-configmap

name: airflow-config
- emptyDir: {}
name: gcsdir

- hostPath:
path: /var/run/docker.sock
type: ""
name: docker-host

- hostPath:
path: /usr/bin/docker
type: ""
name: docker-app

- hostPath:
path: /home/airflow/etlData
type: DirectoryOrCreate
name: etl-data

NOTE: The mount path chosen corresponds to the Airflow variable local_output_path.

Add Poststart Script to Airflow Workers Find the namespace name in the airflow-worker config file. It should be near the top of the file, and may look like `composer-1-12-0-airflow-1-10-10-2fca78f7`. This value will be used in later commands.

Next, open the cloud shell. Keep your airflow-worker configuration file open, or save it. In the cloud shell, create a text file called poststart.sh by running the command: nano poststart.sh. Then, copy the text from the poststart.sh file in this repository into the newly opened file.

  • If you changed the path for the local folder in the previous step, make sure that you edit line 13:

    for file in /home/airflow/etlData/*
    
  • It should reflect the path changes you made. Once the file is finalized, run these commands:

    gcloud container clusters get-credentials <cluster_name> --region=<composer_region>
    
    kubectl create configmap start-config --from-file poststart.sh -n <namespace_name>
  • Return to the airflow-worker config file. Add a new volumeMount to /etc/scripts.

    ...
    
    volumeMounts:
    ...
    - mountPath: /etc/scripts
    name: config-volume
    ...
    
    
  • Then, add a new Volume that links to the configMap you created.

    ...
    volumes:
    ...
    - configMap:
    	defaultMode: 511
    	name: start-config
    	name: config-volume
    ...
    
  • This will make the script available to the Airflow workers. In order for them to call it automatically, add a postStart hook to airflow-worker above the existing preStop hook.

    ...
    lifecycle:
    	postStart:
    		exec:
    			command:
    				- /bin/bash
    				- /etc/scripts/poststart.sh
    preStop:
    	exec:
    		command:
    			- bash
    			- -c
    			- pkill -f "MainProcess"
    ...
    
Click here if you are interested in knowing what the script does.

The export tasks in the etl use Docker images with their own filesystems. Mounting a folder to the Docker image allows us to connect the airflow-worker filesystem to the Docker image filesystem. However, there are multiple airflow-worker instances, and tasks are distributed between them. This means that an export task may occur on one worker, and the subsequent task that needs that file could occur on a different worker instance. There needs to be some way to pool all the data from all the worker instances.

Fortunately, Cloud Composer provides a folder at /home/airflow/gcs/data. This folder is described in detail here. Essentially, the folder is synchronized between all the workers, and it also is linked to the data folder in the environment's Cloud Storage bucket. This means that data stored here will be available to all workers, solving the problem. Unfortunately, since this folder is already connected to a Cloud Storage bucket, it cannot also connect to a Docker image.

Instead, we connect a local folder defined in the previous step. The poststart.sh script runs constantly in the background. It moves files from the local folder to the gcs/data folder. The script is more complicated than a simple move command because it needs to ensure that no programs are writing to the files before they are moved.


Add Airflow Variables and Connections

In order to add the Airflow variables and connections, navigate to the Airflow web server. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow Webserver.

Click the Admin tab, then Connections. Click create, then:

  • Set the Conn Id field to google_cloud_platform_connection.
  • Set the Conn Type to Google Cloud Platform.
  • Set the Project Id to your project id
  • Set the Keyfile Path to <api_key_path>.
  • The <api_key_path> should be the same as the Airflow variable api_key_path.

Next, add the Airflow variables. Click the Admin tab, then Variables. Click the Choose file button, select your variables file, and click import variables.

The airflow_variables.txt file provides a set of default values for variables.



Airflow Variables Explanation

Normal Variables

Variable name Description Should be changed?
affinity JSON object that represents the pod's affinity Yes, if you followed the optional step and made a new node pool.
api_key_path path to the Google Cloud Platform API key No, unless your filename is different.
bq_dataset name of the BigQuery dataset Yes. Change to your dataset name.
bq_project name of the BigQuery project Yes. Change to your project name.
gcs_exported_data_bucket_name name of the Google Cloud Storage bucket that will store exported data Yes. Change to the name of the bucket you made.
image_name name of the ETL's Docker image No, unless you need a specific image version.
image_output_path local output path within the ETL image No.
image_pull_policy Specifies how image pull behavior. Valid values are: Always, IfNotPresent, or Never No, unless you handle image updates manually.
local_output_path local output path within the airflow-worker that is used for temporary storage No, unless you changed the path when modifying the Kubernetes config.
namespace namespace name for ETL tasks that generate Kubernetes pods Yes, if you followed the optional step and made a new namespace
output_file_names JSON object. Each key should be a data structure, and the value should be the name of the output file for that data structure Yes, if desired. Make sure each type has a different filename.
output_path shared output path for exported data No, unless you have a different shared storage solution.
owner the name of the owner of the Airflow DAGs Yes.
schema_filepath file path to schema folder No, unless schemas are in a different location
table_ids JSON object. Each key should be a data structure, and the value should be the name of the BigQuery table Yes, if desired. Make sure each type has a different table name.
cluster_fields JSON object. Each key should be a BigQuery table, and the value is a list of columns that the table is clustered by Yes, if desired for tables that want clustering
parititon_fields JSON object. Each key should be a BigQuery table, and the value is a JSON object of type and field to partition by Yes, if desired for tables that want partitioning
gcs_exported_object_prefix String to prefix run_id export task output path with Yes, if desired to prefix run_id
sentry_dsn Sentry Data Source Name to tell where Sentry SDK should send events Yes
sentry_environment Environment that sentry alerts will fire Yes
use_testnet Flag to use testnet data instead of mainnet Yes, if desired to use testnet data
task_timeout JSON object. Each key should be the airflow util task name, and the value is the timeout in seconds Yes, if desired to give tasks timeout

Kubernetes-Specific Variables

Variable name Description Should be changed?
resources Resources to request and allocate to Kubernetes Pods. No, unless pods need more resources
kube_config_location Location of the kubernetes config file. See here for a guide on finding the Kube config file. If you are running the pods in the same cluster as Airflow, you can leave this value blank. No, unless the pods are in a different cluster than Airflow.
kubernetes_sidecar_image Image used for xcom sidecar No, unless you want to pull a different alpine-based image.
k8s_namespace Namespace to run the task in No, unless the pods are moved into a new namespace
k8s_service_account K8s service account the task runs as No, unless k8s authentication is modified, and is likely linked to the associated GCP service account.
volume_config JSON objects representing the configuration for your Kubernetes volume. Yes. Change configs to match your volume (see below for example configs)
volume_name Name of the persistent ReadWriteMany volume associated with the claim. Yes. Change to your volume name.

Here are some example volume_config values. Note that a ReadWriteMany volume is required when tasks run in parallel.

  • For a an NFS volume set volume_config={"nfs": {"path": "/", "server": "my-server.provider.cloud"}}.
  • In order to set up a persistent volume claim, set volume_config={"persistentVolumeClaim":{"claimName": <claim>}
  • In order to set up a host path volume, set volume_config="hostPath":{"path": <path>, "type": "DirectoryOrCreate"}}

Execution Procedures

Starting Up

NOTE: Google Cloud Composer instance of airflow has limited CLI support. Supported Airflow CLI commands

First, this image has a shows the Airflow web UI components for pausing and triggering DAGs: Airflow UI

  • Ensure that the Airflow scheduler is running: airflow scheduler
  • Ensure that the Airflow web server is running: airflow webserver -p <port>
  • Enable the DAGs
    • Use the command airflow unpause <DAG name> or use the Airflow UI

Handling Failures

Clearing Failures

You can clear failed tasks in the task-instance context menu in the Airflow UI. Clearing failed tasks gives them a chance to run again without requiring you to run the entire DAG again.


Understanding the Setup

This section contains information about the Airflow setup. It includes our DAG diagrams and explanations of tasks. For general Airflow knowledge, check out the Airflow concepts overview or the Airflow tutorial.

DAG Diagrams

History Archive with Captive Core DAG

This DAG:

  • exports transactions, operations, trades, and effects from Stellar using CaptiveCore
  • inserts into BigQuery

    NOTE: SDF writes to both a private dataset and public dataset. Non-SDF instances will probably only need to write to a single private dataset.

History Archive with Captive Core Dag

History Archive without Captive Core DAG

This DAG:

  • exports assets and ledgers from Stellar's history archives
  • inserts into BigQuery

    NOTE: SDF writes to both a private dataset and public dataset. Non-SDF instances will probably only need to write to a single private dataset.

History Archive Dag

State Table Export DAG

This DAG

  • exports accounts, account_signers, offers, claimable_balances, liquidity pools, and trustlines
  • inserts into BigQuery

Bucket List DAG

Bucket List DAG (Unsupported)

NOTE: Bucket List DAG is unsupported.

This DAG:

  • exports from Stellar's bucket list, which contains data on accounts, offers, trustlines, account signers, liqudity pools, and claimable balances
  • inserts into BigQuery

Bucket List DAG


Task Explanations

build_time_task

This file contains methods for creating time tasks. Time tasks call the get_ledger_range_from_times function in the stellar-etl Docker image. The tasks receive the execution time of the current DAG run and the expected execution time of the next run. They convert this time range into a ledger range that can be passed to the export tasks.

build_export_task

This file contains methods for creating export tasks. Export tasks call export functions in the stellar-etl Docker image with a ledger range determined by the upstream time task. The data is exported in a newline-delimited JSON text file with a file name in the format [start ledger]-[end ledger]-[data type].txt.

build_gcs_to_bq_task

This file contains methods for creating tasks that appends information from a Google Cloud Storage file to a BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.

build_apply_gcs_changes_to_bq_task

This file contains methods for creating apply tasks. Apply tasks are used to merge a file from Google Cloud Storage into a BigQuery table. Apply tasks differ from the other task that appends in that they apply changes. This means that they update, delete, and insert rows. These tasks are used for accounts, offers, and trustlines, as the BigQuery table represents the point in time state of these data structures. This means that, for example, a merge task could alter the account balance field in the table if a user performed a transaction, delete a row in the table if a user deleted their account, or add a new row if a new account was created.

Apply tasks can also be used to insert unique values only. This behavior is used for orderbook and history archive data structures. Instead of performing a merge operation, which would update or delete existing rows, the task will simply insert new rows if they don't already exist. This helps prevent duplicated data in a scenario where rows shouldn't change or be deleted. Essentially, this task replicates the behavior of a primary key in a database when used for orderbooks.

build_batch_stats

This file pulls and inserts batch stats into BigQuery. Data is inserted into history_archives_dag_runs.

bq_insert_job_task

This file contains methods for creating BigQuery insert job tasks. The task will read the query from the specified sql file and will return a BigQuery job operator configured to the GCP project and datasets defined.

cross_dependency_task

This file creates an ExternalTaskSensor that triggers on specified DAG tasks's success.

delete_data_task

This file deletes data from a specified BigQuery project.dataset.table according to the batch interval.

NOTE: If the batch interval is changed, the deleted data might not align with the prior batch intervals.


Further Development

This section details further areas of development. It covers a basic guide on how to add new features and test changes to existing features. It also contains a list of project TODOs (check the GitHub issues page for more!)

Extensions

This section covers some possible extensions or further work that can be done.


Pre-commit Git hook scripts

Git can run special scripts at various places in the Git workflow (which the system calls “hooks”). These scripts can do whatever you want and, in theory, can help a team with their development flow.

pre-commit makes hook scripts extremely accessible to teams.

  • Install pre-commit

    # using pip
    $ pip install pre-commit==3.2.1
  • Set up the Git hook scripts

    $ pre-commit install
    pre-commit installed at .git/hooks/pre-commit

That's it. Now pre-commit will run automatically on git commit!


Adding New DAGs

Adding new DAGs is a fairly straightforward process. Create a new python file in the dags folder. Create your dag object using the code below:

dag = DAG(
	'dag_id',
	default_args=get_default_dag_args(),
	description='DAG description.',
	schedule_interval=None,
)

The get_default_dag_args() is defined in the dags/stellar-etl-airflow/default.py file.

Feel free to add more arguments or customize the existing ones. The documentation for a DAG is available here.


Adding tasks to existing DAGs

If you have created a new DAG, or wish to extend an existing DAG, you can add tasks to it by calling the various create_X_task functions that are in the repository. See here for details on how to create dependencies between tasks.


Adding New Tasks

Adding new tasks is a more involved process. You likely need to add a new python file in the dags/stellar_etl_airflow folder. This file should include a function that creates and returns the new task, as well as any auxiliary functions related to the task.

Airflow has a variety of operators. The ones that are most likely to be used are:

You may also find this list of Google-related operators useful for interacting with Google Cloud Storage or BigQuery.

An example of a simple task is the time task. This task converts a time into a ledger range using a stellar-etl command. Since it needs to use the stellar-etl, we need a KubernetesPodOperator. We provide the operator with the command, the task_id, the parent DAG, and some parameters specific to KubernetesPodOperator.

More complex tasks might require a good amount of extra code to set up variables, authenticate, or check for errors. However, keep in mind that tasks should be idempotent. This means that tasks should produce the same output even if they are run multiple times. The same input should always produce the same output.

You may find that you need to pass small amounts of information, like filenames or numbers, from one task to another. You can do so with Airflow's XCOM system.


Testing Changes

Once you make a change, you can test it using the Airflow command line interface. Here's a quick outline of how to test changes:

  • Run kubectl get pods --all-namespaces. Look for a pod that starts with airflow-worker.
  • Run kubectl -n <pod_namespace> exec -it airflow-worker-<rest_of_pod_name> -c airflow-worker -- /bin/bash to get inside the worker
  • Run airflow task test history_archive_export <task_id> <test_date>. Note that if the task you changed has dependencies, you need to run airflow test on those upstream tasks for the exact same date.
  • Run airflow task test on the tasks that depend on the the task you just changed. Ensure that they still perform as expected.

This guide can also be useful for testing deployment in a new environment. Follow this testing process for all the taks in your DAGs to ensure that they work end-to-end.

An alternative to the testing flow above is to trigger the task in the Airflow UI. From there you are able to view the task status, log, and task details.

About

Airflow DAGs for the Stellar ETL project

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 91.0%
  • Shell 9.0%