8/10/2025

Orchestrating dbt with Apache Airflow using YAML Pipelines on Kubernetes (Minikube)

 


This is the second episode of the Apache Airflow series. You can find the github project from the below location: https://github.com/dhanuka84/airflow_dbt_yaml_demo .

1. Introduction


Modern data engineering requires orchestration, modularity, and reproducibility.
In this article, we’ll explore:


- Apache Airflow → Workflow orchestration
- dbt (Data Build Tool) → SQL-based transformations
- YAML-based DAGs → Config-driven pipelines
- Kubernetes with Minikube → Containerized development
We’ll also walk through a developer guide using airflow_dbt_yaml_demo.

2. What is Apache Airflow?


Apache Airflow is an open-source orchestration platform for programmatically authoring, scheduling, and monitoring workflows.
- DAGs (Directed Acyclic Graphs) define workflows.
- Operators execute tasks (e.g., KubernetesPodOperator).
- Scheduler triggers tasks according to schedules.
- Web UI helps visualize runs, dependencies, and logs.

Airflow embodies the workflow-as-code philosophy — everything is defined in Python, enabling version control, testing, and flexibility.

3. What is dbt?


dbt (Data Build Tool) enables analysts and engineers to transform data in the warehouse using SQL + Jinja.
Key features:
- Modular SQL with reusability.
- Testing for data quality.
- Documentation & lineage auto-generated.
- Git-based workflows for collaboration.

Where Airflow orchestrates the when and how, dbt handles the what in transformations.

4. YAML-Based DAGs — Why It’s Powerful

Traditionally, Airflow DAGs are defined in Python. This is flexible but requires Python edits for every change.
YAML-based DAGs separate configuration from execution logic:

Benefits for Data Engineers:
- Non-Python users can define workflows.
- Easy versioning and code review.
- Centralized parameters for quick changes.
- Schema validation reduces runtime errors.

Example from this project:


dag:
  dag_id: dbt_yaml_pipeline
  default_args:
    owner: airflow
    start_date: "2025-08-01"
  schedule_interval: "@daily"

tasks:
  - task_id: dbt_run
    operator: KubernetesPodOperator
    image: airflow-dbt-yaml-demo:0.0.36
    command: "dbt run -v --profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt/my_project"
    on_finish_action: keep_pod
    image_pull_policy: Never



6. Developer Guide — Running airflow_dbt_yaml_demo

Prerequisites:
- Docker
- Minikube
- Helm
- kubectl
- Python 3.9+

Step 1 — Clone the Repo:


git clone https://github.com/dhanuka84/airflow_dbt_yaml_demo.git


cd airflow_dbt_yaml_demo



Step 2 — Start Minikube & Install apache airflow using helm:


minikube start --memory=8192 --cpus=4 --driver=docker
(Optional) eval $(minikube docker-env)


helm repo add apache-airflow https://airflow.apache.org
helm repo update

export NAMESPACE=airflow
export RELEASE_NAME=airflow

helm install $RELEASE_NAME apache-airflow/airflow --namespace $NAMESPACE --create-namespace



Step 3 — Build the Custom Docker Image and upload to minikube:


docker build -t airflow-dbt-yaml-demo:0.0.36 .

$ minikube image load airflow-dbt-yaml-demo:0.0.36

Step 4 — Deploy Airflow with Helm:


helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow -f values.yaml






Step 5 — Verify DAGs:


$ minikube service airflow-api-server -n airflow



Step 6 —Triggering the pipeline:


Go to Dags and click the pipeline.


Then click the Trigger button in the right side corner.



Step 7 — View Logs:


Check tasks like dbt_run or dbt_test for KubernetesPodOperator logs.


7. How It Works Internally

1. yaml_loader.py scans /opt/airflow/yamldags for .yaml files.
2. Parses DAG metadata and tasks.
3. Airflow dynamically registers DAGs.
4. Tasks run as Kubernetes pods executing dbt commands.


8. 📂 Real Project Structure

airflow_dbt_yaml_demo/
├── dags/
│   ├── yaml_loader.py         # Loader to generate DAGs from YAML
│   └── __init__.py
├── dbt/
│   ├── my_project/
│   │   ├── dbt_project.yml    # dbt project configuration
│   │   └── models/            # dbt transformation models
│   │       │ 
│   │       └── model.sql    # sql queries
│   └── profiles.yml           # dbt connection profiles
├── yamldags/
│   └── dbt_yaml_pipeline.yaml # YAML definition for Airflow DAG
├── Dockerfile                 # Build Airflow+dbt image
├── README.md
├── deploy_airflow.sh          # Script to deploy Airflow via Helm
└── values.yaml                # Helm overrides for Airflow


9. Extending the Demo

- Add new pipelines → new .yaml in yamldags/.
- Add new dbt models → .sql in dbt/my_project/models/.
- Add schema validation for safer configs.

10. Conclusion

Combining:
- Airflow for orchestration
- dbt for transformations
- YAML configs for maintainability
- Kubernetes for scalability

…gives data engineers a flexible, scalable, and developer-friendly pipeline framework.


11. Troubleshooting


DBT core doesn’t have a proper logging mechanism hence we have to test dbt related configuration separately with below K8S commands.


#create test pod

$ kubectl -n airflow run dbt-shell   --image=airflow-dbt-yaml-demo:0.0.34   --image-pull-policy=Never   --restart=Never   --command -- /bin/sh -lc "sleep 3600"



#execute dbt command and check the logs

$ kubectl -n airflow exec -it dbt-shell -- bash -lc '

dbt debug -v --profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt/my_project 2>&1 | tee /tmp/dbt_debug.log || true

echo "=== tail debug log ==="; tail -n 500 /tmp/dbt_debug.log

'


#check the db connection

$ kubectl -n airflow exec -it dbt-shell -- bash -lc '

python - <<PY

import psycopg2, socket

HOST="airflow-postgresql.airflow.svc.cluster.local"

USER="postgres"   # or dbt if you created that user

PASS="postgres"   # or dbt

DB  ="postgres"   # or dwh if you created it

print("Resolving:", HOST, "->", socket.gethostbyname_ex(HOST))

conn = psycopg2.connect(host=HOST, user=USER, password=PASS, dbname=DB, port=5432)

with conn.cursor() as c: c.execute("select 1"); print("DB says:", c.fetchone())

conn.close(); print("DB OK")

PY

'

Resolving: airflow-postgresql.airflow.svc.cluster.local -> ('airflow-postgresql.airflow.svc.cluster.local', [], ['10.105.61.2'])

DB says: (1,)

DB OK







# Connectivity test (reads values from profiles.yml)

kubectl -n airflow exec -it dbt-shell -- bash -lc '

python - <<PY

import yaml, psycopg2

cfg = yaml.safe_load(open("/opt/airflow/dbt/profiles.yml"))["default"]["outputs"]["dev"]

print("Connecting to", cfg["host"], cfg["dbname"], cfg["user"])

conn = psycopg2.connect(host=cfg["host"], dbname=cfg["dbname"], user=cfg["user"], password=cfg["password"], port=cfg.get("port",5432))

with conn.cursor() as c:

    c.execute("select 1"); print("DB says:", c.fetchone())

conn.close(); print("DB OK")

PY

'




No comments:

Post a Comment