1. Project Overview
Objective: Build a resilient data pipeline that ingests "dirty" raw data, quarantines invalid records (nulls, duplicates, schema violations) for audit, and promotes only high-quality data to business tables.
Tech Stack: Databricks Free Edition (Serverless), Unity Catalog, PySpark, Delta Lake.
Architecture: Medallion (Bronze $\rightarrow$ Silver $\rightarrow$ Gold).
2. Environment Setup (Unity Catalog)
Goal: Create the governed containers for our data.
Step 1: Open a new Notebook in Databricks.
Step 2: Copy and run the following SQL code to set up the 3-layer namespace.
SQL
%sql
-- 1. Create the Project Catalog
CREATE CATALOG IF NOT EXISTS retail_project_dev;
-- 2. Create the Medallion Schemas
-- Bronze: Raw ingestion (Do No Harm)
CREATE SCHEMA IF NOT EXISTS retail_project_dev.bronze;
-- Silver: Validated & Cleansed (The Trust Layer)
CREATE SCHEMA IF NOT EXISTS retail_project_dev.silver;
-- Gold: Business Aggregates (The Value Layer)
CREATE SCHEMA IF NOT EXISTS retail_project_dev.gold;
-- 3. Create Storage for Raw Files (Unity Catalog Volume)
CREATE VOLUME IF NOT EXISTS retail_project_dev.bronze.landing_zone;
Final expected DB layers
3. The "Chaos" Generator (Creating Raw Data)
Goal: Generate a CSV dataset that intentionally contains data quality failures to test our pipeline.
Script Logic:
5% Null IDs: Violates "Completeness".
5% Invalid Emails: Violates "Format/Validity".
5% Negative Amounts: Violates "Business Logic".
5% Duplicates: Violates "Uniqueness".
Action: Copy this Python code into a new cell and run it once.
Python
%pip install faker
import csv
import random
from faker import Faker
fake = Faker()
# Path to the Unity Catalog Volume
output_path = "/Volumes/retail_project_dev/bronze/landing_zone/raw_orders.csv"
def generate_dirty_data(num_rows=1000):
print(f"Generating {num_rows} rows of 'dirty' data...")
with open(output_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['order_id', 'customer_email', 'amount', 'timestamp', 'status'])
for _ in range(num_rows):
# 1. Base Data
oid = fake.uuid4()
email = fake.email()
amt = round(random.uniform(10.0, 500.0), 2)
ts = fake.date_time_this_year().isoformat()
status = "PENDING"
# 2. Inject Errors
if random.random() < 0.05: oid = None # Null ID
if random.random() < 0.05: email = "bad_email_format" # Bad Email
if random.random() < 0.05: amt = amt * -1 # Negative Amount
writer.writerow([oid, email, amt, ts, status])
# 3. Inject Duplicates
if random.random() < 0.05:
writer.writerow([oid, email, amt, ts, status])
print(f"Data saved to: {output_path}")
generate_dirty_data()
4. Bronze Layer: Defensive Ingestion
Goal: Ingest data without crashing, even if the file is corrupted. We use _rescued_data to catch parsing errors.
Action: Run this cell.
Python
# --- RESET LOGIC (Dev Only) ---
# Clean up the previous table and checkpoints to prevent schema mismatches during testing.
# WARNING: This deletes existing data in the table!
print("Resetting environment...")
spark.sql("DROP TABLE IF EXISTS retail_project_dev.bronze.orders")
dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/orders/", True)
dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/schema/orders/", True) # Clean up schema location too
# --- INGESTION LOGIC ---
# Define paths
source_path = "/Volumes/retail_project_dev/bronze/landing_zone/"
checkpoint_path = "/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/orders/"
# NEW: Auto Loader needs a specific path to store the inferred schema
schema_path = "/Volumes/retail_project_dev/bronze/landing_zone/schema/orders/"
# Read using Auto Loader (cloudFiles)
bronze_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation", schema_path)
# SAFETY FIX: Only read CSV files. Ignores stray JSON or system files that cause header errors.
.option("pathGlobFilter", "*.csv")
.option("header", "true")
# CRITICAL: This captures malformed CSV lines into a separate column
.option("rescuedDataColumn", "_rescued_data")
.load(source_path))
# Write to Delta Table
# Note: mergeSchema is technically not needed if we drop the table above,
# but we keep it here as a safeguard for future runs.
(bronze_df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("retail_project_dev.bronze.orders"))
In summary: null means "No parsing errors found." You are good to proceed to the Silver layer!
We can find negative numbers.
5. Silver Layer: The Quarantine Pattern
Goal: Separate "Good" data from "Bad" data. We do NOT delete bad data; we quarantine it for audit.
Action: Run this cell. This is the core logic of your project.
Python
from pyspark.sql.functions import col, when, concat_ws, lit
# --- RESET CHECKPOINT (Fix for "Different Delta Table" error) ---
# Since we recreated the Bronze table, we must clear the Silver stream's history.
dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/silver/", True)
def process_silver(batch_df, batch_id):
# 1. Define Data Quality Rules
rule_valid_id = col("order_id").isNotNull()
rule_valid_email = col("customer_email").contains("@")
rule_valid_amount = col("amount") > 0
# 2. Tag Records (Pass vs Fail)
# We add a 'dq_status' and a 'dq_reason' column
tagged_df = batch_df.withColumn("dq_status",
when(rule_valid_id & rule_valid_email & rule_valid_amount, "PASS")
.otherwise("FAIL")
).withColumn("dq_reason",
concat_ws(" | ",
when(~rule_valid_id, "Missing ID"),
when(~rule_valid_email, "Invalid Email"),
when(~rule_valid_amount, "Negative Amount")
)
)
# 3. Write QUARANTINE Table (Bad Data)
(tagged_df.filter(col("dq_status") == "FAIL")
.write.format("delta").mode("append")
.saveAsTable("retail_project_dev.silver.orders_quarantine"))
# 4. Write CLEAN Table (Good Data) with Deduplication
(tagged_df.filter(col("dq_status") == "PASS")
.dropDuplicates(["order_id"]) # Handle the duplicate rows here
.drop("dq_status", "dq_reason", "_rescued_data") # Drop metadata columns
.write.format("delta").mode("append")
.saveAsTable("retail_project_dev.silver.orders_clean"))
# Run the Stream
(spark.readStream.table("retail_project_dev.bronze.orders")
.writeStream
.foreachBatch(process_silver)
.option("checkpointLocation", "/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/silver/")
.trigger(availableNow=True)
.start())
We can find only valid data in order_clean table.
Quarantined Data in orders_quarantine table.
6. Gold Layer & Data Quality Analysis
Goal: Prove that the pipeline worked by querying the results.
Action: Run these SQL queries in separate cells.
A. Business Aggregation (The "Clean" Report)
This should contain NO null IDs, NO negative amounts, and NO invalid emails.
SQL
SELECT
count(*) as total_valid_orders,
sum(amount) as total_revenue,
avg(amount) as average_order_value
FROM retail_project_dev.silver.orders_clean;
B. The Error Report (The "Dirty" Truth)
This analyzes exactly what went wrong in the raw data.
SQL
SELECT
dq_reason,
count(*) as error_count,
-- Show me samples of the bad data
collect_list(order_id) as sample_ids
FROM retail_project_dev.silver.orders_quarantine
GROUP BY dq_reason
ORDER BY error_count DESC;
7. Final Step: Visualization (Genie)
Go to the Genie tab in the Databricks sidebar.
Click New Space.
Add the table: retail_project_dev.silver.orders_clean.
Ask Genie: "What is the daily revenue trend?" or "Show me the distribution of order amounts."