11/25/2025

Data Quality Management on Databricks Lakehouse

 

1. Project Overview

Objective: Build a resilient data pipeline that ingests "dirty" raw data, quarantines invalid records (nulls, duplicates, schema violations) for audit, and promotes only high-quality data to business tables.

Tech Stack: Databricks Free Edition (Serverless), Unity Catalog, PySpark, Delta Lake.

Architecture: Medallion (Bronze $\rightarrow$ Silver $\rightarrow$ Gold).


2. Environment Setup (Unity Catalog)

Goal: Create the governed containers for our data.

Step 1: Open a new Notebook in Databricks.

Step 2: Copy and run the following SQL code to set up the 3-layer namespace.

SQL

%sql

-- 1. Create the Project Catalog

CREATE CATALOG IF NOT EXISTS retail_project_dev;


-- 2. Create the Medallion Schemas

-- Bronze: Raw ingestion (Do No Harm)

CREATE SCHEMA IF NOT EXISTS retail_project_dev.bronze;

-- Silver: Validated & Cleansed (The Trust Layer)

CREATE SCHEMA IF NOT EXISTS retail_project_dev.silver;

-- Gold: Business Aggregates (The Value Layer)

CREATE SCHEMA IF NOT EXISTS retail_project_dev.gold;


-- 3. Create Storage for Raw Files (Unity Catalog Volume)

CREATE VOLUME IF NOT EXISTS retail_project_dev.bronze.landing_zone;





Final expected DB layers




3. The "Chaos" Generator (Creating Raw Data)

Goal: Generate a CSV dataset that intentionally contains data quality failures to test our pipeline.

Script Logic:

  • 5% Null IDs: Violates "Completeness".

  • 5% Invalid Emails: Violates "Format/Validity".

  • 5% Negative Amounts: Violates "Business Logic".

  • 5% Duplicates: Violates "Uniqueness".

Action: Copy this Python code into a new cell and run it once.

Python


%pip install faker


import csv

import random

from faker import Faker


fake = Faker()

# Path to the Unity Catalog Volume

output_path = "/Volumes/retail_project_dev/bronze/landing_zone/raw_orders.csv"


def generate_dirty_data(num_rows=1000):

   print(f"Generating {num_rows} rows of 'dirty' data...")

   with open(output_path, 'w', newline='') as f:

       writer = csv.writer(f)

       writer.writerow(['order_id', 'customer_email', 'amount', 'timestamp', 'status'])

      

       for _ in range(num_rows):

           # 1. Base Data

           oid = fake.uuid4()

           email = fake.email()

           amt = round(random.uniform(10.0, 500.0), 2)

           ts = fake.date_time_this_year().isoformat()

           status = "PENDING"


           # 2. Inject Errors

           if random.random() < 0.05: oid = None                   # Null ID

           if random.random() < 0.05: email = "bad_email_format"   # Bad Email

           if random.random() < 0.05: amt = amt * -1               # Negative Amount

          

           writer.writerow([oid, email, amt, ts, status])

          

           # 3. Inject Duplicates

           if random.random() < 0.05:

               writer.writerow([oid, email, amt, ts, status])


   print(f"Data saved to: {output_path}")


generate_dirty_data()







4. Bronze Layer: Defensive Ingestion

Goal: Ingest data without crashing, even if the file is corrupted. We use _rescued_data to catch parsing errors.

Action: Run this cell.

Python

# --- RESET LOGIC (Dev Only) ---

# Clean up the previous table and checkpoints to prevent schema mismatches during testing.

# WARNING: This deletes existing data in the table!

print("Resetting environment...")

spark.sql("DROP TABLE IF EXISTS retail_project_dev.bronze.orders")

dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/orders/", True)

dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/schema/orders/", True) # Clean up schema location too


# --- INGESTION LOGIC ---

# Define paths

source_path = "/Volumes/retail_project_dev/bronze/landing_zone/"

checkpoint_path = "/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/orders/"

# NEW: Auto Loader needs a specific path to store the inferred schema

schema_path = "/Volumes/retail_project_dev/bronze/landing_zone/schema/orders/"


# Read using Auto Loader (cloudFiles)

bronze_df = (spark.readStream

  .format("cloudFiles")

  .option("cloudFiles.format", "csv")

  .option("cloudFiles.inferColumnTypes", "true")

  .option("cloudFiles.schemaLocation", schema_path)

  # SAFETY FIX: Only read CSV files. Ignores stray JSON or system files that cause header errors.

  .option("pathGlobFilter", "*.csv")

  .option("header", "true")

  # CRITICAL: This captures malformed CSV lines into a separate column


  .option("rescuedDataColumn", "_rescued_data")

  .load(source_path))


# Write to Delta Table

# Note: mergeSchema is technically not needed if we drop the table above,

# but we keep it here as a safeguard for future runs.

(bronze_df.writeStream

  .format("delta")

  .option("checkpointLocation", checkpoint_path)

  .option("mergeSchema", "true")

  .trigger(availableNow=True)

  .toTable("retail_project_dev.bronze.orders"))



  • In summary: null means "No parsing errors found." You are good to proceed to the Silver layer!




  • We can find negative numbers.





5. Silver Layer: The Quarantine Pattern

Goal: Separate "Good" data from "Bad" data. We do NOT delete bad data; we quarantine it for audit.

Action: Run this cell. This is the core logic of your project.

Python



from pyspark.sql.functions import col, when, concat_ws, lit


# --- RESET CHECKPOINT (Fix for "Different Delta Table" error) ---

# Since we recreated the Bronze table, we must clear the Silver stream's history.

dbutils.fs.rm("/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/silver/", True)


def process_silver(batch_df, batch_id):

   # 1. Define Data Quality Rules

   rule_valid_id = col("order_id").isNotNull()

   rule_valid_email = col("customer_email").contains("@")

   rule_valid_amount = col("amount") > 0

  

   # 2. Tag Records (Pass vs Fail)

   # We add a 'dq_status' and a 'dq_reason' column

   tagged_df = batch_df.withColumn("dq_status",

       when(rule_valid_id & rule_valid_email & rule_valid_amount, "PASS")

      .otherwise("FAIL")

   ).withColumn("dq_reason",

       concat_ws(" | ",

           when(~rule_valid_id, "Missing ID"),

           when(~rule_valid_email, "Invalid Email"),

           when(~rule_valid_amount, "Negative Amount")

       )

   )

  

   # 3. Write QUARANTINE Table (Bad Data)

   (tagged_df.filter(col("dq_status") == "FAIL")

      .write.format("delta").mode("append")

      .saveAsTable("retail_project_dev.silver.orders_quarantine"))

      

   # 4. Write CLEAN Table (Good Data) with Deduplication

   (tagged_df.filter(col("dq_status") == "PASS")

      .dropDuplicates(["order_id"]) # Handle the duplicate rows here

      .drop("dq_status", "dq_reason", "_rescued_data") # Drop metadata columns

      .write.format("delta").mode("append")

      .saveAsTable("retail_project_dev.silver.orders_clean"))


# Run the Stream

(spark.readStream.table("retail_project_dev.bronze.orders")

  .writeStream

  .foreachBatch(process_silver)

  .option("checkpointLocation", "/Volumes/retail_project_dev/bronze/landing_zone/checkpoints/silver/")

  .trigger(availableNow=True)

  .start())





  • We can find only valid data in order_clean table.







  • Quarantined Data in orders_quarantine table.





6. Gold Layer & Data Quality Analysis

Goal: Prove that the pipeline worked by querying the results.

Action: Run these SQL queries in separate cells.

A. Business Aggregation (The "Clean" Report)

This should contain NO null IDs, NO negative amounts, and NO invalid emails.

SQL

SELECT 

    count(*) as total_valid_orders,

    sum(amount) as total_revenue,

    avg(amount) as average_order_value

FROM retail_project_dev.silver.orders_clean;



B. The Error Report (The "Dirty" Truth)

This analyzes exactly what went wrong in the raw data.

SQL

SELECT 

    dq_reason,

    count(*) as error_count,

    -- Show me samples of the bad data

    collect_list(order_id) as sample_ids

FROM retail_project_dev.silver.orders_quarantine

GROUP BY dq_reason

ORDER BY error_count DESC;




7. Final Step: Visualization (Genie)

    1. Go to the Genie tab in the Databricks sidebar.

    2. Click New Space.

    3. Add the table: retail_project_dev.silver.orders_clean.

    4. Ask Genie: "What is the daily revenue trend?" or "Show me the distribution of order amounts."