Data Engineering in Databricks: From Data Ingestion to Advanced ETL
Simplified for Real-World Use Cases

Data Engineering in Databricks: From Data Ingestion to Advanced ETL Simplified for Real-World Use Cases

Table of contents

1. Data Ingestion in Databricks: The Foundation

Data ingestion is the process of importing data from diverse sources (like CSV, Parquet, or databases) into a centralized system for processing. In Databricks, this is streamlined with tools optimized for scalability and efficiency.

Reading/Writing Data from Multiple Sources

  • Supported Formats: CSV, JSON, Parquet, Delta Lake, Avro, XML, and more 37.

  • Example: A retail company ingests daily sales data from CSV files and customer behavior logs in Parquet.

      # Read CSV
      sales_df = spark.read.csv("dbfs:/sales_data/*.csv", header=True)
    
      # Read Parquet
      logs_df = spark.read.parquet("dbfs:/logs/2025/02/*.parquet")
    
      # Write to Delta Lake (optimized for analytics)
      sales_df.write.format("delta").save("dbfs:/delta/sales")
    
  • Delta Lake Benefits: ACID transactions, time travel, and schema enforcement ensure reliability.


2. Auto Loader: Streaming Data Made Simple

Auto Loader is Databricks’ solution for incremental data ingestion, ideal for real-time or near-real-time pipelines.

Key Features

  • Incremental Processing: Detects and processes only new files in cloud storage (e.g., Azure ADLS, S3)

  • Schema Inference & Evolution: Automatically detects schema changes (e.g., new columns) and evolves tables without manual intervention

  • Fault Tolerance: Uses checkpointing (RocksDB) to ensure exactly-once processing, even after failures

Real-Time Example: Logistics Company Tracking Shipments

# Configure Auto Loader for JSON files
streaming_df = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "/checkpoint/shipments_schema")
  .load("dbfs:/incoming_shipments/"))

# Write to Delta Lake with deduplication
def upsert_shipments(batch_df, batch_id):
  delta_table = DeltaTable.forPath(spark, "/delta/shipments")
  (delta_table.alias("target")
    .merge(batch_df.alias("source"), "target.tracking_id = source.tracking_id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())

(streaming_df.writeStream
  .foreachBatch(upsert_shipments)
  .option("checkpointLocation", "/checkpoint/shipments")
  .start())

This Databricks Spark Structured Streaming code automatically loads JSON files, processes them in real-time, and updates a Delta Lake table with deduplication.

Let’s break it down step by step.

Step 1: Configure Auto Loader for JSON Files

streaming_df = (spark.readStream
  .format("cloudFiles")  # Enable Auto Loader
  .option("cloudFiles.format", "json")  # Specify file format
  .option("cloudFiles.schemaLocation", "/checkpoint/shipments_schema")  # Store schema for evolution
  .load("dbfs:/incoming_shipments/"))  # Monitor this folder for new files

What this does?

  • Uses Databricks Auto Loader (cloudFiles) to continuously ingest new JSON files.

  • Auto Loader automatically detects schema changes (schema evolution).

  • The source directory is "dbfs:/incoming_shipments/" where new JSON files arrive.

  • Stores schema in "/checkpoint/shipments_schema" for future schema evolution.

Why use Auto Loader?

  • Handles large-scale streaming ingestion efficiently.

  • Optimized for cloud storage like Azure Blob, S3, or ADLS.

  • Supports schema evolution (detects new columns automatically).


Step 2: Define Upsert Logic (Merge New & Existing Data)

from delta.tables import DeltaTable

def upsert_shipments(batch_df, batch_id):  # Function to process each batch
  delta_table = DeltaTable.forPath(spark, "/delta/shipments")  # Load existing Delta Table

  (delta_table.alias("target")  # Alias for existing data
    .merge(batch_df.alias("source"), "target.tracking_id = source.tracking_id")  # Match condition
    .whenMatchedUpdateAll()  # If tracking_id exists, update all columns
    .whenNotMatchedInsertAll()  # If tracking_id is new, insert the record
    .execute())  # Run merge

What this does?

  • Loads the existing Delta Table from "/delta/shipments".

  • Uses merge (UPSERT) logic:

    • If the tracking_id exists, update the existing record.

    • If the tracking_id is new, insert it as a fresh record.

  • Prevents duplicates and ensures data consistency.

Why use Delta Lake Merge?

  • Handles late-arriving data (e.g., delayed shipment updates).

  • Supports incremental updates (only changes are applied).

  • No need for complex SQL joins—everything happens in one step.


Step 3: Stream Data into Delta Lake with Checkpointing

(streaming_df.writeStream
  .foreachBatch(upsert_shipments)  # Process each micro-batch with the upsert logic
  .option("checkpointLocation", "/checkpoint/shipments")  # Track processed data
  .start())  # Start the streaming job

What this does?

  • Streams data into Delta Lake in micro-batches.

  • Calls upsert_shipments() for each batch.

  • Uses checkpointing (/checkpoint/shipments) to:

    • Track processed files (prevents reprocessing duplicates).

    • Ensure fault tolerance (if Databricks restarts, it resumes from the last checkpoint).

Why use .foreachBatch() instead of .writeStream.format("delta")?

  • Allows complex logic (like merge()).

  • Handles schema evolution.

  • Supports deduplication & updates.

3. Transformations with PySpark: From Basics to Advanced

PySpark enables scalable data transformations using DataFrame APIs.

Common Operations

  • Filtering: Remove invalid records (e.g., orders with negative quantities).

  • Aggregations: Calculate daily revenue or customer lifetime value.

  • Joins: Combine sales data with customer profiles.

Example: Healthcare Analytics

# Clean patient data
clean_patients = raw_patients.filter(col("age") > 0)

# Aggregate by region
avg_blood_pressure = clean_patients.groupBy("region").agg(avg("blood_pressure"))

# Join with treatment logs
patient_treatment = clean_patients.join(treatment_logs, "patient_id", "inner")

Advanced: Handling Nested JSON

Auto Loader infers nested JSON as strings. Use PySpark’s semi-structured data functions:

transformed_df = streaming_df.selectExpr(
  "device_id", 
  "metrics:heart_rate::int",  # Extract nested field
  "timestamp"
)

4. Advanced Concepts for Production-Grade ETL

Schema Evolution & Rescue

  • Schema Hints: Override inferred types (e.g., enforce date instead of string)

      .option("cloudFiles.schemaHints", "order_date DATE, revenue DOUBLE")
    
  • Rescued Data: Capture malformed records in _rescued_data for later analysis

Understanding .option("cloudFiles.schemaHints", "order_date DATE, revenue DOUBLE") in PySpark

What does this option do?

.option("cloudFiles.schemaHints", "order_date DATE, revenue DOUBLE") is used in Auto Loader (Databricks’ streaming file ingestion feature) to provide hints for schema inference when reading cloud storage files.


Breaking it Down

  1. cloudFiles.schemaHints → A schema inference hint used in Auto Loader to specify the data types of specific columns.

  2. order_date DATE, revenue DOUBLE → Specifies that:

    • order_date should be treated as a DATE type.

    • revenue should be treated as a DOUBLE (floating-point number).


When to Use schemaHints?

By default, Auto Loader infers the schema automatically. However, in some cases:

  • The schema changes over time, and PySpark might not infer it correctly.

  • Some fields might be read incorrectly (e.g., DATE might be treated as STRING).

  • Providing hints can speed up schema inference and avoid incorrect data types.

Example: Reading Cloud Data with Schema Hints

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")  
      .option("cloudFiles.schemaHints", "order_date DATE, revenue DOUBLE")  
      .load("s3://my-bucket/orders"))

Production Configuration

  • Auto Loader in Databricks is used for efficiently ingesting files from cloud storage (AWS S3, Azure Blob, Google Cloud Storage). In a production setup, we need to optimize it for cost, completeness, and monitoring.


    1️⃣ Cost Control: Use Trigger.AvailableNow for Batch Processing

    What it does?

    • Controls how often data is processed to reduce cloud costs.

    • Trigger.AvailableNow ensures Auto Loader processes only available files and then stops (like a batch job).

Example Usage:

    from pyspark.sql.streaming import Trigger

    df = (spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "csv")  
          .option("cloudFiles.schemaHints", "order_date DATE, revenue DOUBLE")
          .load("s3://my-bucket/orders"))

    # Trigger.AvailableNow processes the data in batch mode, reducing cost
    df.writeStream.trigger(Trigger.AvailableNow()).format("delta").start("/mnt/delta/orders")

Why use it?
Lower cost – No continuously running cluster.
Ideal for scheduled batch jobs (e.g., process new data every hour).
Best for handling large historical datasets (backfills).


2️⃣ Backfills: Use cloudFiles.backfillInterval to Ensure Data Completeness

What it does?

  • Handles historical data ingestion (backfilling old files).

  • cloudFiles.backfillInterval allows you to periodically reprocess old files that may have been missed.

Example Usage:

    df = (spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")  
          .option("cloudFiles.schemaHints", "user_id STRING, purchase_amount DOUBLE")
          .option("cloudFiles.backfillInterval", "1h")  # Checks and backfills data every hour
          .load("s3://my-bucket/user-purchases"))

Why use it?
Ensures data completeness (fills in missing records).
Useful when dealing with late-arriving files.
Helps maintain data integrity across multiple processing jobs.

3️⃣ Monitoring: Track File Backlog with cloud_files_state('<checkpoint>')

What it does?

  • Monitors file ingestion progress and identifies backlogs.

  • cloud_files_state('<checkpoint>') gives insights into how many files are pending or have been processed.

Example Usage:

    spark.sql("SELECT * FROM cloud_files_state('/mnt/checkpoints/my_stream')")

Why use it?
Detect ingestion delays (e.g., if files are not processed on time).
Monitor the number of pending files and their timestamps.
Optimize resources based on backlog trends.


Summary: Best Practices for Production

AspectSettingPurpose
Cost ControlTrigger.AvailableNowBatch processing, reducing cloud costs
Data CompletenesscloudFiles.backfillIntervalEnsures old/missed files are processed
Monitoringcloud_files_state('<checkpoint>')Tracks file backlog & ingestion status

By following these production best practices, your Auto Loader will be cost-efficient, complete, and easy to monitor!

5. Real-World Use Cases :AutoLoader

  1. Retail: Ingest daily sales (CSV) + real-time website clicks (JSON) → Delta Lake → Power BI dashboards.

  2. IoT: Stream sensor data (Parquet) → Detect anomalies → Trigger alerts.

  3. Healthcare: Merge patient records (Delta) with lab results (CSV) → Predictive modeling.


6.Delta Lake and Data Lakehouse: The Future of Scalable Data Management

Modern businesses deal with huge amounts of data, and managing it efficiently is a challenge. Traditionally, companies used data warehouses (structured, expensive) or data lakes (cheap, but slow and unreliable).

Delta Lake solves this problem by combining the best of both worlds. It powers the Data Lakehouse architecture, providing fast, reliable, and scalable data storage for analytics and AI.

In this article, we'll explore what Delta Lake is, its key features, and how it optimizes data pipelines for real-world applications.

What is Delta Lake?

Delta Lake is an open-source storage layer built on top of Apache Spark and cloud-based data lakes like AWS S3, Azure Data Lake, and Google Cloud Storage.

It fixes the problems of traditional data lakes by providing:
Reliability (no missing or corrupt data).
Faster Queries (compared to raw Parquet/CSV).
Support for Updates & Deletes (which was impossible in data lakes before!).

Example:
Imagine you have an e-commerce store 🛒.

  • Your sales data is stored in a data lake (AWS S3).

  • When a customer cancels an order, your system must delete or update the record.

  • Traditional data lakes can’t do this easily, but Delta Lake makes it possible!


7. Key Features of Delta Lake

7.1 ACID Transactions: Ensuring Data Reliability

What is it?

  • ACID (Atomicity, Consistency, Isolation, Durability) ensures reliable, corruption-free transactions in Delta Lake.

Example:
Imagine you're withdrawing money from an ATM 💰.

  • The transaction must be fully completed or rolled back if there’s an error.

  • Delta Lake ensures data correctness just like a bank!

How Delta Lake ensures ACID?

  • Write operations are logged (so no data is lost).

  • If a failure happens, the data automatically rolls back to its previous state.

7.2 Schema Evolution: Flexible Data Structure

What is it?

  • Schema Evolution allows adding new columns or modifying existing ones without breaking existing data.

Example:
Imagine you're managing customer data for a retail store.

  • Initially, you store Customer Name & Phone Number.

  • Later, you need to add Email & Address.

  • Without Delta Lake, modifying schemas in data lakes is painful.

  • With Delta Lake, it’s simple!

How to enable Schema Evolution?

df.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/delta/customer_data")

7.3 Time Travel: View Historical Data Easily

What is it?

  • Time Travel allows accessing previous versions of data without backups.

Example:
A bank needs to verify transactions from last month 💳.

  • Instead of restoring backups, Delta Lake allows querying old versions easily.

How to retrieve past data?

df_v2 = spark.read.format("delta").option("versionAsOf", 2).load("/mnt/delta/sales_data")

Another example:
If a data scientist accidentally deletes records, they can restore old data using Time Travel!


8. Optimizing Data Pipelines with Delta Lake

Delta Lake makes data pipelines faster, cheaper, and more efficient.

8.1 Delta Lake vs. Parquet/CSV (Why Delta is Faster!)

FeatureParquet/CSV (Data Lakes)Delta Lake (Lakehouse)
SpeedSlow10x Faster 🚀
Updates & Deletes❌ No✅ Yes
Schema Evolution❌ Hard✅ Easy
ACID Transactions❌ No✅ Yes

Example:
A ride-sharing company (like Uber 🚖) needs real-time trip updates.

  • With CSV/Parquet, ride cancellations are hard to process.

  • With Delta Lake, they can update trip data instantly!

8.2 Delta Lake for Streaming Data

Delta Lake supports both batch & streaming data.
Example: Processing Real-Time IoT Sensor Data

df = spark.readStream.format("delta").load("/mnt/delta/iot_sensors")
df.writeStream.format("delta").outputMode("append").start("/mnt/delta/iot_processed")

Use Case:
A factory uses IoT sensors to monitor machine temperature.

  • If a machine overheats, Delta Lake instantly updates records and triggers alerts.

8.3 Delta Lake Performance Optimization (ZORDER & OPTIMIZE)

Delta Lake provides two major performance boosters:

  • OPTIMIZE – Compacts small files into large ones for faster queries.

  • ZORDER BY – Sorts data to make queries even faster.

Example: Optimizing for Faster Queries

OPTIMIZE delta.`/mnt/delta/sales_data` ZORDER BY (customer_id)

Why use it?

  • Queries run 100x faster 📈

  • Reduces storage costs


9. Real-World Use Cases of Delta Lake

Retail & E-commerce – Update & delete customer orders in real time.
Banking & Finance – Track financial transactions with ACID transactions.
IoT & Manufacturing – Process millions of sensor readings per second.
Healthcare – Store & process medical records securely.


10. Recent Delta Lake Features (Latest Version)

Delta Lake 3.0 (Latest Version) includes:

Predictive File Compaction (Auto-optimizing small files).
Delta Kernel (Faster performance with open-source integrations).
Column-Level Updates (Update specific columns without full table rewrite).

Example: Update a single column efficiently

MERGE INTO delta.`/mnt/delta/orders` AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET target.status = source.status

11. Summary

FeatureWhy It's Useful?
ACID TransactionsPrevents data corruption
Schema EvolutionAllows flexible schema updates
Time TravelAccess historical data without backups
Optimized QueriesFaster & more efficient than Parquet
Streaming SupportHandles real-time data
Cost OptimizationSaves storage & processing costs

Delta Lake solves the biggest problems in big data by making data lakes faster, reliable, and scalable. Whether you’re handling real-time streaming, large-scale data processing, or big analytics, Delta Lake is the future of data engineering.

Databricks simplifies data engineering with:

  • Auto Loader for scalable, fault-tolerant ingestion.

  • Delta Lake for reliable storage.

  • PySpark for flexible transformations.

By combining these tools, teams can build pipelines that handle everything from batch CSV processing to real-time streaming—all while adapting to schema changes and minimizing costs.