This is the second episode of the Apache Airflow series. You can find the github project from the below location: https://github.com/dhanuka84/airflow_dbt_yaml_demo .
1. Introduction
Modern data engineering requires orchestration, modularity, and reproducibility.
In this article, we’ll explore:
- Apache Airflow → Workflow orchestration
- dbt (Data Build Tool) → SQL-based transformations
- YAML-based DAGs → Config-driven pipelines
- Kubernetes with Minikube → Containerized development
We’ll also walk through a developer guide using airflow_dbt_yaml_demo.
2. What is Apache Airflow?
Apache Airflow is an open-source orchestration platform for programmatically authoring, scheduling, and monitoring workflows.
- DAGs (Directed Acyclic Graphs) define workflows.
- Operators execute tasks (e.g., KubernetesPodOperator).
- Scheduler triggers tasks according to schedules.
- Web UI helps visualize runs, dependencies, and logs.
Airflow embodies the workflow-as-code philosophy — everything is defined in Python, enabling version control, testing, and flexibility.
3. What is dbt?
dbt (Data Build Tool) enables analysts and engineers to transform data in the warehouse using SQL + Jinja.
Key features:
- Modular SQL with reusability.
- Testing for data quality.
- Documentation & lineage auto-generated.
- Git-based workflows for collaboration.
Where Airflow orchestrates the when and how, dbt handles the what in transformations.
4. YAML-Based DAGs — Why It’s Powerful
Traditionally, Airflow DAGs are defined in Python. This is flexible but requires Python edits for every change.
YAML-based DAGs separate configuration from execution logic:
Benefits for Data Engineers:
- Non-Python users can define workflows.
- Easy versioning and code review.
- Centralized parameters for quick changes.
- Schema validation reduces runtime errors.
Example from this project:
dag:
dag_id: dbt_yaml_pipeline
default_args:
owner: airflow
start_date: "2025-08-01"
schedule_interval: "@daily"
tasks:
- task_id: dbt_run
operator: KubernetesPodOperator
image: airflow-dbt-yaml-demo:0.0.36
command: "dbt run -v --profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt/my_project"
on_finish_action: keep_pod
image_pull_policy: Never
6. Developer Guide — Running airflow_dbt_yaml_demo
Prerequisites:
- Docker
- Minikube
- Helm
- kubectl
- Python 3.9+
Step 1 — Clone the Repo:
git clone https://github.com/dhanuka84/airflow_dbt_yaml_demo.git
cd airflow_dbt_yaml_demo
Step 2 — Start Minikube & Install apache airflow using helm:
minikube start --memory=8192 --cpus=4 --driver=docker
(Optional) eval $(minikube docker-env)
helm repo add apache-airflow https://airflow.apache.org
helm repo update
export NAMESPACE=airflow
export RELEASE_NAME=airflow
helm install $RELEASE_NAME apache-airflow/airflow --namespace $NAMESPACE --create-namespace
Step 3 — Build the Custom Docker Image and upload to minikube:
docker build -t airflow-dbt-yaml-demo:0.0.36 .
$ minikube image load airflow-dbt-yaml-demo:0.0.36
Step 4 — Deploy Airflow with Helm:
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow -f values.yaml
Step 5 — Verify DAGs:
$ minikube service airflow-api-server -n airflow
Step 6 —Triggering the pipeline:
Go to Dags and click the pipeline.
Then click the Trigger button in the right side corner.
Step 7 — View Logs:
Check tasks like dbt_run or dbt_test for KubernetesPodOperator logs.
7. How It Works Internally
1. yaml_loader.py scans /opt/airflow/yamldags for .yaml files.
2. Parses DAG metadata and tasks.
3. Airflow dynamically registers DAGs.
4. Tasks run as Kubernetes pods executing dbt commands.
8. 📂 Real Project Structure
airflow_dbt_yaml_demo/
├── dags/
│ ├── yaml_loader.py # Loader to generate DAGs from YAML
│ └── __init__.py
├── dbt/
│ ├── my_project/
│ │ ├── dbt_project.yml # dbt project configuration
│ │ └── models/ # dbt transformation models
│ │ │
│ │ └── model.sql # sql queries
│ └── profiles.yml # dbt connection profiles
├── yamldags/
│ └── dbt_yaml_pipeline.yaml # YAML definition for Airflow DAG
├── Dockerfile # Build Airflow+dbt image
├── README.md
├── deploy_airflow.sh # Script to deploy Airflow via Helm
└── values.yaml # Helm overrides for Airflow
9. Extending the Demo
- Add new pipelines → new .yaml in yamldags/.
- Add new dbt models → .sql in dbt/my_project/models/.
- Add schema validation for safer configs.
10. Conclusion
Combining:
- Airflow for orchestration
- dbt for transformations
- YAML configs for maintainability
- Kubernetes for scalability
…gives data engineers a flexible, scalable, and developer-friendly pipeline framework.
11. Troubleshooting
DBT core doesn’t have a proper logging mechanism hence we have to test dbt related configuration separately with below K8S commands.
#create test pod
$ kubectl -n airflow run dbt-shell --image=airflow-dbt-yaml-demo:0.0.34 --image-pull-policy=Never --restart=Never --command -- /bin/sh -lc "sleep 3600"
#execute dbt command and check the logs
$ kubectl -n airflow exec -it dbt-shell -- bash -lc '
dbt debug -v --profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt/my_project 2>&1 | tee /tmp/dbt_debug.log || true
echo "=== tail debug log ==="; tail -n 500 /tmp/dbt_debug.log
'
$ kubectl -n airflow exec -it dbt-shell -- bash -lc '
python - <<PY
import psycopg2, socket
HOST="airflow-postgresql.airflow.svc.cluster.local"
USER="postgres" # or dbt if you created that user
PASS="postgres" # or dbt
DB ="postgres" # or dwh if you created it
print("Resolving:", HOST, "->", socket.gethostbyname_ex(HOST))
conn = psycopg2.connect(host=HOST, user=USER, password=PASS, dbname=DB, port=5432)
with conn.cursor() as c: c.execute("select 1"); print("DB says:", c.fetchone())
conn.close(); print("DB OK")
PY
'
Resolving: airflow-postgresql.airflow.svc.cluster.local -> ('airflow-postgresql.airflow.svc.cluster.local', [], ['10.105.61.2'])
DB says: (1,)
DB OK
# Connectivity test (reads values from profiles.yml)
kubectl -n airflow exec -it dbt-shell -- bash -lc '
python - <<PY
import yaml, psycopg2
cfg = yaml.safe_load(open("/opt/airflow/dbt/profiles.yml"))["default"]["outputs"]["dev"]
print("Connecting to", cfg["host"], cfg["dbname"], cfg["user"])
conn = psycopg2.connect(host=cfg["host"], dbname=cfg["dbname"], user=cfg["user"], password=cfg["password"], port=cfg.get("port",5432))
with conn.cursor() as c:
c.execute("select 1"); print("DB says:", c.fetchone())
conn.close(); print("DB OK")
PY
'
No comments:
Post a Comment